首頁  >  文章  >  後端開發  >  PHP使用SWOOLE擴充實作定時同步

PHP使用SWOOLE擴充實作定時同步

不言
不言原創
2018-06-07 17:30:542882瀏覽

本文要跟大家介紹的是一次比較特殊的任務,我們使用PHP SWOOLE 做一個異步的定時任務系統,具體如何來實現的呢,接下來來我們好好看看吧

南寧公司和幾個分公司之間都使用了呼叫系統,然後現在需要做一個呼叫通話數據分析,由於分公司的呼叫伺服器是在內網,透過技術手段映射出來,分公司到南寧之間的網絡不穩定,所以需要把分公司的通話資料同步到南寧。

本身最簡單的方法就是直接設定MySQL的主從同步就可以同步資料到南寧來了。但是銷售呼叫系統那邊的公司不給MySQL權限我們。所以這個方法只能放棄了。

於是我們乾脆的想,使用PHP來實現定時一個簡易的PHP定時同步工具,然後PHP進程常駐後台運行,所以首先就先到了一個PHP組件:SWOOLE,經過討論,分公司的每天半天產生的資料量最大在5000條左右,所以這個方案是可行,就這樣乾。

我們使用PHP SWOOLE 做一個非同步的定時任務系統。

本身MySQL資料庫的主從同步是透過解析Master庫中的binary-log來進行同步資料到從庫的。然而我們使用PHP來同步數據的時候,那麼只能從master庫分批查詢數據,然後插入到南寧的slave庫來了。

這裡我們使用的框架是ThinkPHP 3.2 .

首先安裝PHP擴充: SWOOLE,因為沒有使用到特別的功能,所以這裡我們使用pecl來快速安裝:

pecl install swoole

安裝完成後在php.ini 裡面加入extension="swoole.so" 安裝完成後,我們使用phpinfo() 來檢查是否成功了.

安裝成功了,我們就來寫業務.

服務端

#1、先啟動一個後台的服務端,監聽埠9501

public function index()
{
 $serv = new \swoole_server("0.0.0.0", 9501);
 $serv->set([
  'worker_num' => 1,//一般设置为服务器CPU数的1-4倍
  'task_worker_num' => 8,//task进程的数量
  'daemonize' => 1,//以守护进程执行
  'max_request' => 10000,//最大请求数量
  "task_ipc_mode " => 2 //使用消息队列通信,并设置为争抢模式
 ]);
 $serv->on('Receive', [$this, 'onReceive']);//接收任务,并投递
 $serv->on('Task', [$this, 'onTask']);//可以在这个方法里面处理任务
 $serv->on('Finish', [$this, 'onFinish']);//任务完成时候调用
 $serv->start();
}

2、接收與投遞任務

public function onReceive($serv, $fd, $from_id, $data)
{
 //使用json_decode 解析任务数据
 $areas = json_decode($data,true);
 foreach ($areas as $area){
  //投递异步任务
  $serv->task($area);
 }
}

3、任務執行,資料從master函式庫查詢並寫入到slave資料庫

public function onTask($serv, $task_id, $from_id, $task_data)
{
 $area = $task_data;//参数是地区编号
 $rows = 50; //每页多少条
 //主库地址,根据参数地区($area)编号切换master数据库连接
 //从库MySQL实例,根据参数地区($area)编号切换slave数据库连接
 //由于程序是常驻内存的,所以MySQL连接可以使用长连接,然后重复利用。要使用设计模式的,可以使用对象池模式
 Code......

 //master 库为分公司的数据库,slave库为数据同步到南宁后的从库
 Code......

 //使用$sql获取从库中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1
 $slaveMaxIncrementId = ...;

 //使用$sql获取主库中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1
 $masterMaxIncrementId = ...;

 //如果相等的就不同步了
 if($slaveMaxIncrementId >= $masterMaxIncrementId){
  return false;
 }

 //根据条数计算页数
 $dataNumber = ceil($masterMaxIncrementId - $slaveMaxIncrementId);
 $eachNumber = ceil($dataNumber / $rows);
 $left = 0;

 //根据页数来进行分批循环进行写入,要记得及时清理内存
 for ($i = 0; $i < $eachNumber; $i++) {
  $left = $i == 0 ? $slaveMaxIncrementId : $left + $rows;
  $right = $left + $rows;
  //生成分批查询条件
  //$where = "id > $left AND <= $right";
  $masterData = ...;//从主库查询数据
  $slaveLastInsertId = ...;//插入到从库
  unset($masterData,$slaveLastInsertId);
 }

 echo "New AsyncTask[id=$task_id]".PHP_EOL;
 $serv->finish("$area -> OK");
}

4、任務完成時候呼叫

public function onFinish($serv, $task_id, $task_data)
{
 echo "AsyncTask[$task_id] Finish: $task_data".PHP_EOL;
}

客戶端推送任務

到此基本完成,剩下來我們來寫客戶端任務推送

public function index()
{
 $client = new \swoole_client(SWOOLE_SOCK_TCP);
 if (!$client->connect(&#39;127.0.0.1&#39;, 9501, 1)) {
  throw new Exception(&#39;链接SWOOLE服务错误&#39;);
 }
 $areas = json_encode([&#39;liuzhou&#39;,&#39;yulin&#39;,&#39;beihai&#39;,&#39;guilin&#39;]);
 //开始遍历检查
 $client->send($areas);
 echo "任务发送成功".PHP_EOL;
}

至此基本上完成了,剩下的我們來寫一個shell腳本定時執行:/home/wwwroot/sync_db/crontab/send.sh

#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH

# 定时推送异步的数据同步任务
/usr/bin/php /home/wwwroot/sync_db/server.php home/index/index

使用crontab定時任務,我們把腳本加入定時任務

#设置每天12:30执行数据同步任务
30 12 * * * root /home/wwwroot/sync_db/crontab/send.sh
#设置每天19:00执行数据同步任务
0 19 * * * root /home/wwwroot/sync_db/crontab/send.sh

Tips: 最好推薦在裡面加入寫日誌操作,這樣好知道是任務推送、執行是否成功。

至此基本完成,程式有待優化~~~,各位看客有更好的方法歡迎提出。

以上就是本文的全部內容,希望對大家的學習有所幫助,更多相關內容請關注PHP中文網!

相關推薦:

PHP實作查詢兩個陣列中不同元素的方法

php實作mysql連線池的效果

#

以上是PHP使用SWOOLE擴充實作定時同步的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn