RollingCurl: PHP并发最佳实践


在实际项目或者自己编写小工具(比如新闻聚合,商品价格监控,比价)的过程中, 通常需要从第3方网站或者API接口获取数据, 在需要处理1个URL队列时, 为了提高性能, 可以采用cURL提供的curlmulti*族函数实现简单的并发。


1. 经典classicCurl并发机制及其存在的问题

经典的curl实现机制在网上很容易找到, 比如参考PHP在线手册的如下实现方式:

  1. function classicCurl(array $curls, $timeout = 3)
  2. {
  3. $now = microtime(true);
  4. $mud = memory_get_usage();
  5. $queue = curl_multi_init();
  6. $map = [];
  7. $ret = [];
  8. foreach ($curls as $curl) {
  9. if (!isset($curl['key']) || empty($curl['key']) || !isset($curl['url']) || empty($curl['url'])) {
  10. continue;
  11. }
  12. // 创建句柄
  13. $ch = curl_init();
  14. // 提交方式
  15. if (isset($curl['type']) && strtolower($curl['type']) === 'post') {
  16. curl_setopt($ch, CURLOPT_POST, true);
  17. if (isset($curl['params']) && is_array($curl['params']) && !empty($curl['params'])) {
  18. curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($curl['params'], null, '&'));
  19. }
  20. } else {
  21. if (isset($curl['params']) && is_array($curl['params']) && !empty($curl['params'])) {
  22. $curl['url'] .= '?' . http_build_query($curl['params'], null, '&');
  23. }
  24. }
  25. // 设置url
  26. curl_setopt($ch, CURLOPT_URL, $curl['url']);
  27. // 设置header
  28. curl_setopt($ch, CURLOPT_HEADER, false);
  29. // 获取的信息以字符串返回,而不是直接输出
  30. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  31. // 设置超时
  32. curl_setopt($ch, CURLOPT_TIMEOUT, isset($curl['timeout']) ? $curl['timeout'] : $timeout);
  33. // true时忽略所有的cURL传递给PHP进行的信号。在SAPI多线程传输时此项被默认启用,所以超时选项仍能使用
  34. curl_setopt($ch, CURLOPT_NOSIGNAL, true);
  35. // 增加句柄
  36. curl_multi_add_handle($queue, $ch);
  37. // 添加映射,方便后期处理数据
  38. $map[$curl['key']] = $ch;
  39. }
  40. // 这样写是为以防CPU过高,请求假死的现象
  41. do {
  42. while (($mrc = curl_multi_exec($queue, $active)) == CURLM_CALL_MULTI_PERFORM) ;
  43. if ($active && curl_multi_select($queue) == -1) {
  44. usleep(1);
  45. }
  46. } while ($active && $mrc == CURLM_OK);
  47. foreach ($map as $key => $ch) {
  48. // 从句柄中获取回应内容
  49. $ret[$key] = ['error' => curl_error($ch), 'result' => curl_multi_getcontent($ch)];
  50. // 关闭已完成的句柄
  51. curl_multi_remove_handle($queue, $ch);
  52. curl_close($ch);
  53. }
  54. // 关闭全部句柄
  55. curl_multi_close($queue);
  56. return ['memory' => round((memory_get_usage() - $mud) / 1024, 6) . 'kb',
  57. 'spend' => sprintf('%.6fs', microtime(true) - $now), 'result' => $ret];
  58. }

首先将所有的URL压入并发队列, 然后执行并发过程, 等待所有请求接收完之后进行数据的解析等后续处理。
在实际的处理过程中, 受网络传输的影响, 部分URL的内容会优先于其他URL返回, 但是经典curl并发必须等待最慢的那个URL返回之后才开始处理。
等待也就意味着CPU的空闲和浪费. 如果URL队列很短, 这种空闲和浪费还处在可接受的范围, 但如果队列很长, 这种等待和浪费将变得不可接受。

2. 改进的RollingCurlL并发方式

仔细分析不难发现经典curl并发还存在优化的空间, 优化的方式时当某个URL请求完毕之后尽可能快的去处理它, 边处理边等待其他的URL返回, 而不是等待那个最慢的接口返回之后才开始处理等工作, 从而避免CPU的空闲和浪费。
闲话不多说, 下面贴上具体的实现:

  1. function rollingCurl(array $curls, $timeout = 3)
  2. {
  3. $now = microtime(true);
  4. $mud = memory_get_usage();
  5. $map = [];
  6. $ret = [];
  7. // 创建批处理cURL句柄
  8. $queue = curl_multi_init();
  9. foreach ($curls as $curl) {
  10. if (!isset($curl['key']) || empty($curl['key']) || !isset($curl['url']) || empty($curl['url'])) {
  11. continue;
  12. }
  13. // 创建句柄
  14. $ch = curl_init();
  15. // 提交方式
  16. if (isset($curl['type']) && strtolower($curl['type']) === 'post') {
  17. curl_setopt($ch, CURLOPT_POST, true);
  18. if (isset($curl['params']) && is_array($curl['params']) && !empty($curl['params'])) {
  19. curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($curl['params'], null, '&'));
  20. }
  21. } else {
  22. if (isset($curl['params']) && is_array($curl['params']) && !empty($curl['params'])) {
  23. $curl['url'] .= '?' . http_build_query($curl['params'], null, '&');
  24. }
  25. }
  26. // 设置url
  27. curl_setopt($ch, CURLOPT_URL, $curl['url']);
  28. // 设置header
  29. curl_setopt($ch, CURLOPT_HEADER, false);
  30. // 获取的信息以字符串返回,而不是直接输出
  31. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  32. // 设置超时
  33. curl_setopt($ch, CURLOPT_TIMEOUT, isset($curl['timeout']) ? $curl['timeout'] : $timeout);
  34. // true时忽略所有的cURL传递给PHP进行的信号。在SAPI多线程传输时此项被默认启用,所以超时选项仍能使用
  35. curl_setopt($ch, CURLOPT_NOSIGNAL, true);
  36. // 增加句柄
  37. curl_multi_add_handle($queue, $ch);
  38. // 映射句柄
  39. $map[(string)$ch] = ['key' => $curl['key'], 'callback' => isset($curl['callback']) ? $curl['callback'] : null];
  40. }
  41. do {
  42. while (($mrc = curl_multi_exec($queue, $active)) == CURLM_CALL_MULTI_PERFORM) ;
  43. // a request was just completed -- find out which one
  44. while ($done = curl_multi_info_read($queue)) {
  45. $curl = $map[(string)$done['handle']];
  46. if (!isset($curl)) {
  47. continue;
  48. }
  49. // get the info and content returned on the request
  50. $error = curl_error($done['handle']);
  51. $result = curl_multi_getcontent($done['handle']);
  52. if (isset($curl['callback'])) {
  53. $result = $curl['callback']($result, $error);
  54. }
  55. // get the spend time return on the request
  56. $spend = sprintf('%.6fs', microtime(true) - $now);
  57. $ret[$curl['key']] = compact('spend', 'error', 'result');
  58. // remove the curl handle that just completed
  59. curl_multi_remove_handle($queue, $done['handle']);
  60. curl_close($done['handle']);
  61. }
  62. // Block for data in / output; error handling is done by curl_multi_exec
  63. if ($active && curl_multi_select($queue) == -1) {
  64. usleep(1);
  65. }
  66. } while ($active && $mrc == CURLM_OK);
  67. // 关闭全部句柄
  68. curl_multi_close($queue);
  69. return ['memory' => round((memory_get_usage() - $mud) / 1024, 6) . 'kb',
  70. 'spend' => sprintf('%.6fs', microtime(true) - $now), 'result' => $ret];
  71. }


  1. function debugRollingCurl(array $urls, $timeout = 3)
  2. {
  3. $now = microtime(true);
  4. $mud = memory_get_usage();
  5. $map = [];
  6. $ret = [];
  7. // 创建批处理cURL句柄
  8. $queue = curl_multi_init();
  9. foreach ($urls as $data) {
  10. if (!isset($curl['key']) || empty($curl['key']) || !isset($curl['url']) || empty($curl['url'])) {
  11. continue;
  12. }
  13. // 创建句柄
  14. $ch = curl_init();
  15. // 提交方式
  16. if (isset($curl['type']) && strtolower($curl['type']) === 'post') {
  17. curl_setopt($ch, CURLOPT_POST, true);
  18. if (isset($curl['params']) && is_array($curl['params']) && !empty($curl['params'])) {
  19. curl_setopt($ch, CURLOPT_POSTFIELDS, http_build_query($curl['params'], null, '&'));
  20. }
  21. } else {
  22. if (isset($curl['params']) && is_array($curl['params']) && !empty($curl['params'])) {
  23. $curl['url'] .= '?' . http_build_query($curl['params'], null, '&');
  24. }
  25. }
  26. // 设置url
  27. curl_setopt($ch, CURLOPT_URL, $curl['url']);
  28. // 设置header
  29. curl_setopt($ch, CURLOPT_HEADER, false);
  30. // 获取的信息以字符串返回,而不是直接输出
  31. curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
  32. // 设置超时
  33. curl_setopt($ch, CURLOPT_TIMEOUT, isset($curl['timeout']) ? $curl['timeout'] : $timeout);
  34. // true时忽略所有的cURL传递给PHP进行的信号。在SAPI多线程传输时此项被默认启用,所以超时选项仍能使用
  35. curl_setopt($ch, CURLOPT_NOSIGNAL, true);
  36. // 增加句柄
  37. curl_multi_add_handle($queue, $ch);
  38. // 映射句柄
  39. $map[(string)$ch] = ['key' => $curl['key'], 'callback' => isset($curl['callback']) ? $curl['callback'] : null];
  40. }
  41. $debug = [];
  42. $active = 0;
  43. $counter = 0;
  44. do {
  45. $debug[] = 'Start: ' . sprintf('%.6fs', microtime(true) - $now);
  46. do {
  47. $mrc = curl_multi_exec($queue, $active);
  48. $debug[] = 'Exec: ' . sprintf('%.6fs', microtime(true) - $now) . ', [mrc=' . $mrc . '; active=' . $active . ']';
  49. } while ($mrc == CURLM_CALL_MULTI_PERFORM);
  50. // a request was just completed -- find out which one
  51. while ($done = curl_multi_info_read($queue)) {
  52. $spend = sprintf('%.6fs', microtime(true) - $now);
  53. $url = $map[(string)$done['handle']];
  54. if (!isset($url)) {
  55. continue;
  56. }
  57. $debug[] = '**************';
  58. $debug[] = 'Readed: ' . sprintf('%.6fs', microtime(true) - $now) . ', [' . $url['key'] . ']';
  59. $error = curl_error($done['handle']);
  60. $debug[] = 'Error: ' . sprintf('%.6fs', microtime(true) - $now) . ', [' . $error . ']';
  61. $result = curl_multi_getcontent($done['handle']);
  62. if (isset($url['callback'])) {
  63. $debug[] = 'callbacking: ' . sprintf('%.6fs', microtime(true) - $now);
  64. $result = $url['callback']($result);
  65. $debug[] = 'callbacked: ' . sprintf('%.6fs', microtime(true) - $now);
  66. }
  67. $ret[$url['key']] = compact('spend', 'error', 'result');
  68. // remove the curl handle that just completed
  69. curl_multi_remove_handle($queue, $done['handle']);
  70. curl_close($done['handle']);
  71. $debug[] = '**************';
  72. }
  73. // Block for data in / output; error handling is done by curl_multi_exec
  74. if ($active > 0) {
  75. $debug[] = 'Select: ' . sprintf('%.6fs', microtime(true) - $now) . ', [active=' . $active . ']';
  76. if (curl_multi_select($queue) == -1) {
  77. usleep(1);
  78. }
  79. }
  80. $debug[] = '-------' . $counter . '-------';
  81. $counter++;
  82. } while ($active && $mrc == CURLM_OK);
  83. // 关闭全部句柄
  84. curl_multi_close($queue);
  85. return ['memory' => round((memory_get_usage() - $mud) / 1024, 6) . 'kb',
  86. 'spend' => sprintf('%.6fs', microtime(true) - $now), 'debug' => $debug, 'result' => $ret];
  87. }


  1. $urls = [
  2. [
  3. 'key' => 'kuaidi100',
  4. 'url' => 'http://www.kuaidi100.com/query',
  5. 'type' => 'get',
  6. 'params' => ['postid' => '800125432030318719', 'type' => 'yuantong'],
  7. 'timeout' => 3,
  8. 'callback' => function ($res) {
  9. sleep(1); // 故意等待一秒,测试在回调函数里会不会阻塞
  10. return json_decode($res, true);
  11. }
  12. ],
  13. [
  14. 'key' => 'ip',
  15. 'url' => 'http://ip.taobao.com/service/getIpInfo.php',
  16. 'type' => 'get',
  17. 'params' => ['ip' => ''],
  18. 'timeout' => 3
  19. ]
  20. ];
  21. $ret = debugRollingCurl($curls);
  22. echo json_encode($ret);
  23. exit();


  1. {
  2. "memory": "21.523438kb",
  3. "debug": [
  4. "Start: 0.000073s",
  5. "Exec: 0.000262s, [mrc=0; active=2]",
  6. "Select: 0.000267s, [active=2]",
  7. "-------0-------",
  8. "Start: 0.008574s",
  9. "Exec: 0.008662s, [mrc=0; active=2]",
  10. "Select: 0.008669s, [active=2]",
  11. "-------1-------",
  12. "Start: 0.032995s",
  13. "Exec: 0.033079s, [mrc=0; active=2]",
  14. "Select: 0.033085s, [active=2]",
  15. "-------2-------",
  16. "Start: 0.047638s",
  17. "Exec: 0.047709s, [mrc=0; active=2]",
  18. "Select: 0.047716s, [active=2]",
  19. "-------3-------",
  20. "Start: 0.050612s",
  21. "Exec: 0.050677s, [mrc=0; active=1]",
  22. "**************",
  23. "Readed: 0.050693s, [kuaidi100]",
  24. "Error: 0.050697s, []",
  25. "callbacking: 0.050700s",
  26. "callbacked: 1.124298s",
  27. "**************",
  28. "Select: 1.124344s, [active=1]",
  29. "-------4-------",
  30. "Start: 1.124364s",
  31. "Exec: 1.124417s, [mrc=0; active=0]",
  32. "**************",
  33. "Readed: 1.124429s, [ip]",
  34. "Error: 1.124433s, []",
  35. "**************",
  36. "-------5-------"
  37. ],
  38. "result": {
  39. "kuaidi100": {
  40. "spend": "1.124311s",
  41. "error": "",
  42. "result": {
  43. "message": "ok",
  44. "nu": "800125432030318719",
  45. "ischeck": "1",
  46. "condition": "F00",
  47. "com": "yuantong",
  48. "status": "200",
  49. "state": "3",
  50. "data": [
  51. {
  52. "time": "2018-06-14 11:50:36",
  53. "ftime": "2018-06-14 11:50:36",
  54. "context": "客户 签收人: 已签收,签收人凭取货码签收。 已签收 感谢使用圆通速递,期待再次为您服务",
  55. "location": ""
  56. },
  57. {
  58. "time": "2018-06-14 09:37:25",
  59. "ftime": "2018-06-14 09:37:25",
  60. "context": "快件已被明福智富广场二座速递易【自提柜】代收,请及时取件。有问题请联系派件员15295855857",
  61. "location": ""
  62. },
  63. {
  64. "time": "2018-06-14 09:10:14",
  65. "ftime": "2018-06-14 09:10:14",
  66. "context": "【广东省佛山市江湾公司】 派件人: 彭明喜 派件中 派件员电话15295855857",
  67. "location": ""
  68. },
  69. {
  70. "time": "2018-06-14 08:39:00",
  71. "ftime": "2018-06-14 08:39:00",
  72. "context": "【广东省佛山市江湾公司】 已收入",
  73. "location": ""
  74. },
  75. {
  76. "time": "2018-06-14 02:17:01",
  77. "ftime": "2018-06-14 02:17:01",
  78. "context": "【广东省佛山市南海公司】 已发出 下一站 【广东省佛山市江湾公司】",
  79. "location": ""
  80. },
  81. {
  82. "time": "2018-06-14 00:30:43",
  83. "ftime": "2018-06-14 00:30:43",
  84. "context": "【佛山转运中心】 已发出 下一站 【广东省佛山市南海公司】",
  85. "location": ""
  86. },
  87. {
  88. "time": "2018-06-14 00:25:41",
  89. "ftime": "2018-06-14 00:25:41",
  90. "context": "【佛山转运中心】 已收入",
  91. "location": ""
  92. },
  93. {
  94. "time": "2018-06-12 23:16:44",
  95. "ftime": "2018-06-12 23:16:44",
  96. "context": "【宁波转运中心】 已发出 下一站 【佛山转运中心】",
  97. "location": ""
  98. },
  99. {
  100. "time": "2018-06-12 23:13:46",
  101. "ftime": "2018-06-12 23:13:46",
  102. "context": "【宁波转运中心】 已收入",
  103. "location": ""
  104. },
  105. {
  106. "time": "2018-06-12 20:53:52",
  107. "ftime": "2018-06-12 20:53:52",
  108. "context": "【浙江省宁波市慈杭新区公司】 已发出 下一站 【宁波转运中心】",
  109. "location": ""
  110. },
  111. {
  112. "time": "2018-06-12 20:19:17",
  113. "ftime": "2018-06-12 20:19:17",
  114. "context": "【浙江省宁波市慈杭新区公司】 已打包",
  115. "location": ""
  116. },
  117. {
  118. "time": "2018-06-12 18:47:10",
  119. "ftime": "2018-06-12 18:47:10",
  120. "context": "【浙江省宁波市慈杭新区公司】 已收件",
  121. "location": ""
  122. }
  123. ]
  124. }
  125. },
  126. "ip": {
  127. "spend": "1.124435s",
  128. "error": "",
  129. "result": "{\"code\":0,\"data\":{\"ip\":\"\",\"country\":\"美国\",\"area\":\"\",\"region\":\"华盛顿\",\"city\":\"西雅图\",\"county\":\"XX\",\"isp\":\"电讯盈科\",\"country_id\":\"US\",\"area_id\":\"\",\"region_id\":\"US_147\",\"city_id\":\"US_1107\",\"county_id\":\"xx\",\"isp_id\":\"3000107\"}}\n"
  130. }
  131. },
  132. "spend": "1.124552s"
  133. }


  1. ...
  2. "callbacking: 0.050700s",
  3. "callbacked: 1.124298s",
  4. ...



