本文要跟大家介紹的是一次比較特殊的任務,我們使用PHP SWOOLE 做一個異步的定時任務系統,具體如何來實現的呢,接下來來我們好好看看吧
南寧公司和幾個分公司之間都使用了呼叫系統,然後現在需要做一個呼叫通話數據分析,由於分公司的呼叫伺服器是在內網,透過技術手段映射出來,分公司到南寧之間的網絡不穩定,所以需要把分公司的通話資料同步到南寧。
本身最簡單的方法就是直接設定MySQL的主從同步就可以同步資料到南寧來了。但是銷售呼叫系統那邊的公司不給MySQL權限我們。所以這個方法只能放棄了。
於是我們乾脆的想,使用PHP來實現定時一個簡易的PHP定時同步工具,然後PHP進程常駐後台運行,所以首先就先到了一個PHP組件:SWOOLE,經過討論,分公司的每天半天產生的資料量最大在5000條左右,所以這個方案是可行,就這樣乾。
我們使用PHP SWOOLE 做一個非同步的定時任務系統。
本身MySQL資料庫的主從同步是透過解析Master庫中的binary-log來進行同步資料到從庫的。然而我們使用PHP來同步數據的時候,那麼只能從master庫分批查詢數據,然後插入到南寧的slave庫來了。
這裡我們使用的框架是ThinkPHP 3.2
.
首先安裝PHP擴充: SWOOLE,因為沒有使用到特別的功能,所以這裡我們使用pecl來快速安裝:
pecl install swoole
安裝完成後在php.ini
裡面加入extension="swoole.so"
安裝完成後,我們使用phpinfo()
來檢查是否成功了.
安裝成功了,我們就來寫業務.
服務端
#1、先啟動一個後台的服務端,監聽埠9501
public function index() { $serv = new \swoole_server("0.0.0.0", 9501); $serv->set([ 'worker_num' => 1,//一般设置为服务器CPU数的1-4倍 'task_worker_num' => 8,//task进程的数量 'daemonize' => 1,//以守护进程执行 'max_request' => 10000,//最大请求数量 "task_ipc_mode " => 2 //使用消息队列通信,并设置为争抢模式 ]); $serv->on('Receive', [$this, 'onReceive']);//接收任务,并投递 $serv->on('Task', [$this, 'onTask']);//可以在这个方法里面处理任务 $serv->on('Finish', [$this, 'onFinish']);//任务完成时候调用 $serv->start(); }
2、接收與投遞任務
public function onReceive($serv, $fd, $from_id, $data) { //使用json_decode 解析任务数据 $areas = json_decode($data,true); foreach ($areas as $area){ //投递异步任务 $serv->task($area); } }
3、任務執行,資料從master函式庫查詢並寫入到slave資料庫
public function onTask($serv, $task_id, $from_id, $task_data) { $area = $task_data;//参数是地区编号 $rows = 50; //每页多少条 //主库地址,根据参数地区($area)编号切换master数据库连接 //从库MySQL实例,根据参数地区($area)编号切换slave数据库连接 //由于程序是常驻内存的,所以MySQL连接可以使用长连接,然后重复利用。要使用设计模式的,可以使用对象池模式 Code...... //master 库为分公司的数据库,slave库为数据同步到南宁后的从库 Code...... //使用$sql获取从库中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1 $slaveMaxIncrementId = ...; //使用$sql获取主库中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1 $masterMaxIncrementId = ...; //如果相等的就不同步了 if($slaveMaxIncrementId >= $masterMaxIncrementId){ return false; } //根据条数计算页数 $dataNumber = ceil($masterMaxIncrementId - $slaveMaxIncrementId); $eachNumber = ceil($dataNumber / $rows); $left = 0; //根据页数来进行分批循环进行写入,要记得及时清理内存 for ($i = 0; $i < $eachNumber; $i++) { $left = $i == 0 ? $slaveMaxIncrementId : $left + $rows; $right = $left + $rows; //生成分批查询条件 //$where = "id > $left AND <= $right"; $masterData = ...;//从主库查询数据 $slaveLastInsertId = ...;//插入到从库 unset($masterData,$slaveLastInsertId); } echo "New AsyncTask[id=$task_id]".PHP_EOL; $serv->finish("$area -> OK"); }
4、任務完成時候呼叫
public function onFinish($serv, $task_id, $task_data) { echo "AsyncTask[$task_id] Finish: $task_data".PHP_EOL; }
客戶端推送任務
到此基本完成,剩下來我們來寫客戶端任務推送
public function index() { $client = new \swoole_client(SWOOLE_SOCK_TCP); if (!$client->connect('127.0.0.1', 9501, 1)) { throw new Exception('链接SWOOLE服务错误'); } $areas = json_encode(['liuzhou','yulin','beihai','guilin']); //开始遍历检查 $client->send($areas); echo "任务发送成功".PHP_EOL; }
至此基本上完成了,剩下的我們來寫一個shell腳本定時執行:/home/wwwroot/sync_db/crontab/send.sh
#!/bin/bash PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin export PATH # 定时推送异步的数据同步任务 /usr/bin/php /home/wwwroot/sync_db/server.php home/index/index
使用crontab定時任務,我們把腳本加入定時任務
#设置每天12:30执行数据同步任务 30 12 * * * root /home/wwwroot/sync_db/crontab/send.sh #设置每天19:00执行数据同步任务 0 19 * * * root /home/wwwroot/sync_db/crontab/send.sh
Tips: 最好推薦在裡面加入寫日誌操作,這樣好知道是任務推送、執行是否成功。
至此基本完成,程式有待優化~~~,各位看客有更好的方法歡迎提出。
以上就是本文的全部內容,希望對大家的學習有所幫助,更多相關內容請關注PHP中文網!
相關推薦:
#
以上是PHP使用SWOOLE擴充實作定時同步的詳細內容。更多資訊請關注PHP中文網其他相關文章!