博客列表 >php+websocket主动推送心得总结

php+websocket主动推送心得总结

雨殇的博客
雨殇的博客原创
2017年11月01日 16:15:471284浏览

    关于websocket是基于tcp协议的长链接协议,一个多月时间我脑细胞死了不知道多少,再加上php本身的坑让我水生火热的煎熬了许久。一开始用原生写了websocket服务端几个版本无一例外在和当前项目衔接过程中短路了,自己石乐志一般以为php可以实现主推。撞破了南墙还往前跑,最后问某某群中的大神们,回答我一个残酷的事实,直觉往往是错觉,无奈之下从头开始用框架,swoole开始继续爬坑、下面是我爬坑的原生几个websocket版本。

服务端代码:

<?php

/* * *****websocket类库第n版****** */

class websocket {


    public $log;

    public $event;

    public $signets;

    public $users;

    public $master;

    public $push;


    public function __construct($config) {

//        if (substr(php_sapi_name(), 0, 3) !== 'cli') {

//            die("请通过命令行模式运行!");

//        }

        error_reporting(E_ALL);

        set_time_limit(0);

        ob_implicit_flush();

        $this->event = $config['event'];

        $this->push = $config['push'];

        $this->log = $config['log'];

        $this->master = $this->WebSocket($config['address'], $config['port']);

        $this->sockets = array('s' => $this->master);

    }


    function WebSocket($address, $port) {

        $server = $this->soc = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

//         $this->soc =$this->createSocket($address,$port);

        $this->socs = array($this->soc);

        socket_set_option($server, SOL_SOCKET, SO_REUSEADDR, 1);

        socket_bind($server, $address, $port);

        socket_listen($server);

        $this->log('Start Listenning: ' . $address . ' : ' . $port);

        return $server;

    }


    function run() {

        while (true) {

            $changes = $this->sockets;

            @socket_select($changes, $write = NULL, $except = NULL, NULL);

            foreach ($changes as $sign) {

                if ($sign == $this->master) {

                    $client = socket_accept($this->master);

                    $this->sockets[] = $client;

                    $user = array(

                        'socket' => $client,

                        'hand' => false,

                    );

                    $this->users[] = $user;

                    $k = $this->search($client);

                    $eventreturn = array('k' => $k, 'sign' => $sign);

                    $this->eventoutput('in', $eventreturn);

                } else {

                    $len = socket_recv($sign, $buffer, 20480, 0);

//                    $receive_str = '接收的客户端传输字节长度为';

//                    print_r(iconv("UTF-8", "GBK", $receive_str) . $len);

                    $k = $this->search($sign);

                    $user = $this->users[$k];

//                    var_dump('The Value Of user is ' . $client);

                    //写入数据库

                    $receive_data = json_decode($this->decode($buffer), true);

//                                      var_dump($receive_data);

                    $hardwareKey = $receive_data['req']['hardwareKey'];

                    var_dump('HardwareKey is ' . $hardwareKey);

                    if (!empty($hardwareKey)) {

                        $user_arr = array(

                            'k' => $k,

                            'hardwareKey' => $hardwareKey

                        );

//                        var_dump($user_arr);

////                    var_dump($user_arr);

                        $pdo = new PDO("mysql:host=192.168.1.50;dbname=lonbon_distribute", "root", "lonbon");

                        $sth = $pdo->prepare('INSERT INTO sock (ws_id,hardwareKey) VALUES (:ws_id, :hardwareKey)');

                        $params = array(

                            'ws_id' => $user_arr['k'],

                            'hardwareKey' => $user_arr['hardwareKey'],

                        );

                        $sth->execute($params);

                    }

                    //如果接受的是0字节

                    if ($len < 7) {

                        $this->close($sign);

                        $eventreturn = array('k' => $k, 'sign' => $sign);

                        $this->eventoutput('out', $eventreturn);

                        continue;

                    }

                    if (!$this->users[$k]['hand']) {//没有握手进行握手

                        $this->handshake($k, $buffer);

                    } else {

                        $buffer = $this->decode($buffer); //存储客户端传输字节

                        $eventreturn = array('k' => $k, 'sign' => $sign, 'msg' => $buffer);

                        $this->eventoutput('msg', $eventreturn);

//                        $this->pushoutput('activePush', $eventreturn);

//                        $this->write($k,'addasdas');

//                        $this->idwrite($sign, 'Register Message Success');

                    }

                }

            }


        }

    }


    /*     * ****广播消息第n版***** */


    function broadcast($data) {

        foreach ($this->sockets as $socket) {

            if ($socket['resource'] == $this->master) {

                continue;

            }

            $this->write($socket['resource'], $data);

        }

    }


    function search($sign) {//通过标示遍历获取id

        foreach ($this->users as $k => $v) {

            if ($sign == $v['socket'])

                return $k;

        }

        return false;

    }


    function close($sign) {//通过标示断开连接

        $k = array_search($sign, $this->sockets);

        socket_close($sign);

        unset($this->sockets[$k]);

        unset($this->users[$k]);

    }


    function handshake($k, $buffer) {

        $buf = substr($buffer, strpos($buffer, 'Sec-WebSocket-Key:') + 18);

        $key = trim(substr($buf, 0, strpos($buf, "\r\n")));

        $new_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));

        $new_message = "HTTP/1.1 101 Switching Protocols\r\n";

        $new_message .= "Upgrade: websocket\r\n";

        $new_message .= "Sec-WebSocket-Version: 13\r\n";

        $new_message .= "Connection: Upgrade\r\n";

        $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";

        socket_write($this->users[$k]['socket'], $new_message, strlen($new_message));

        $this->users[$k]['hand'] = true;

        return true;

    }


//解包

    function uncode($str) {

        $mask = array();

        $data = '';

        $msg = unpack('H*', $str);

        $head = substr($msg[1], 0, 2);

        if (hexdec($head{1}) === 8) {

            $data = false;

        } else if (hexdec($head{1}) === 1) {

            $mask[] = hexdec(substr($msg[1], 4, 2));

            $mask[] = hexdec(substr($msg[1], 6, 2));

            $mask[] = hexdec(substr($msg[1], 8, 2));

            $mask[] = hexdec(substr($msg[1], 10, 2));

            $s = 12;

            $e = strlen($msg[1]) - 2;

            $n = 0;

            for ($i = $s; $i <= $e; $i+= 2) {

                $data .= chr($mask[$n % 4] ^ hexdec(substr($msg[1], $i, 2)));

                $n++;

            }


            //发送数据到客户端

            //如果长度大于125 将数据分块

            $block = str_split($data, 1024);

            $mess = array(

                'mess' => $block[0],

            );

//            return json_encode($mess,TRUE);

        }

//    return $data;

        return json_encode($mess, TRUE);

    }


    //*************************/

    /**     * 打包     * @param string $buffer     */

    function encode($buffer) {

        $len = strlen($buffer);

        if ($len <= 125) {

            return "\x81" . chr($len) . $buffer;

        } else if ($len <= 65535) {

            return "\x81" . chr(126) . pack("n", $len) . $buffer;

        } else {

            //pack("xxxN", $len)pack函数只处理2的32次方大小的文件,实际上2的32次方已经4G了。 

            return "\x81" . char(127) . pack("xxxxN", $len) . $buffer;

        }

    }


    /**     * 解包     * @param string $buffer     * @return string     */

    function decode($buffer) {

        $len = $masks = $data = $decoded = null;

        $len = ord($buffer[1]) & 127;

        if ($len === 126) {

            $masks = substr($buffer, 4, 4);

            $data = substr($buffer, 8);

        } else if ($len === 127) {

            $masks = substr($buffer, 10, 4);

            $data = substr($buffer, 14);

        } else {

            $masks = substr($buffer, 2, 4);

            $data = substr($buffer, 6);

        } for ($index = 0; $index < strlen($data); $index++) {

            $decoded .= $data[$index] ^ $masks[$index % 4];

        } return $decoded;

    }


    function ord_hex($data) {

        $msg = '';

        $l = strlen($data);

        for ($i = 0; $i < $l; $i++) {

            $msg .= dechex(ord($data{$i}));

        }

        return $msg;

    }


    function idwrite($id, $t) {//通过id推送信息

        if (!$this->users[$id]['socket']) {

            return false;

        }//没有这个标示

        $t = $this->decode($t);

        return socket_write($this->users[$id]['socket'], $t, strlen($t));

    }


    function write($k, $t) {//通过标示推送信息

        $t = $this->encode($t);

        return socket_write($k, $t, strlen($t));

    }


    function eventoutput($type, $event) {//事件回调

        call_user_func($this->event, $type, $event);

    }


    function pushoutput($type, $event) {

        //推送回调

        call_user_func($this->push, $type, $event);

    }


    function log($t) {//控制台输出

        if ($this->log) {

            $t = $t . "\r\n";

            fwrite(STDOUT, iconv('utf-8', 'gbk//IGNORE', $t));

        }

    }



    public function hands($client, $buff, $v) {

//提取websocket传的key并进行加密  (这是固定的握手机制获取Sec-WebSocket-Key:里面的key)

        $buf = substr($buff, strpos($buff, 'Sec-WebSocket-Key:') + 18);

//去除换行空格字符

        $key = trim(substr($buf, 0, strpos($buf, "\r\n")));

//固定的加密算法

        $new_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));

        $new_message = "HTTP/1.1 101 Switching Protocols\r\n";

        $new_message .= "Upgrade: websocket\r\n";

        $new_message .= "Sec-WebSocket-Version: 13\r\n";

        $new_message .= "Connection: Upgrade\r\n";

        $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";

//将套接字写入缓冲区

        socket_write($v, $new_message, strlen($new_message));

// socket_write(socket,$upgrade.chr(0), strlen($upgrade.chr(0)));

//标记此套接字握手成功

        $this->hand[(int) $client] = true;

    }



    /* websocket主动推送数据 */


    function sendWsMessage($message) {

        $changes = $this->sockets;

        @socket_select($changes, $write = NULL, $except = NULL, NULL);

        foreach ($changes as $sign) {

            //如果有新的client连接进来,则

            if ($sock == $this->master) {


                //接受一个socket连接

                $client = socket_accept($this->master);


                //给新连接进来的socket一个唯一的ID

                $key = uniqid();

                $this->sockets[] = $client; //将新连接进来的socket存进连接池

                $this->users[$key] = array(

                    'socket' => $client, //记录新连接进来client的socket信息

                    'hand' => false    //标志该socket资源没有完成握手

                );

                //否则1.为client断开socket连接,2.client发送信息

            } else {

                $len = 0;

                $buffer = '';

                //读取该socket的信息,注意:第二个参数是引用传参即接收数据,第三个参数是接收数据的长度

                do {

                    $l = socket_recv($sock, $buffer, 1000, 0);

                    $len+=$l;

                    $buffer.=$message;

                } while ($l == 1000);


                //根据socket在user池里面查找相应的$k,即健ID

                $k = $this->search($sock);


                //如果接收的信息长度小于7,则该client的socket为断开连接

                if ($len < 7) {

                    //给该client的socket进行断开操作,并在$this->sockets和$this->users里面进行删除

                    $this->close($k);

                    //$this->send2($k);

                    continue;

                }

                //判断该socket是否已经握手

                if (!$this->users[$k]['hand']) {

                    //如果没有握手,则进行握手处理

                    $this->handshake($k, $buffer);

                } else {

                    //走到这里就是该client发送信息了,对接受到的信息进行uncode处理

                    $buffer = $this->uncode($buffer, $k);


                    if ($buffer == false) {

                        continue;

                    }

                    //如果不为空,则进行消息推送操作

                    $this->write($k, $buffer);

                }

            }

        }

    }

    function getusers() {

        $ar = array();

        foreach ($this->users as $k => $v) {

            $ar[] = array('code' => $k, 'name' => $v['name']);

        }

        return $ar;

    }

}

/*****************************************************************/

入口文件ws_start.php

<?php


/**

 * websocket 入口文件

 * 实现功能及项目总结

 * 1.接收客户端注册websocket的相关接口

 * 2.心跳包监听

 * 3.推送消息和其他业务模块实现

 * 4.新增配置项,优化结构

 * 5.注意异常处理

 * 

 * errorLog 错误 100 接收json数据错误

 * 101 参数错误(hardWareKey)

 *  */

/* 定义入口文件 */

header("Content-type: text/html; charset=utf-8");

//header("Content-type: text/html; charset=gb2312"); 

define('DIR_WEBSOCKET', dirname(__FILE__));


require('lib/app/websocket.class.php');

$config = array(

    'address' => '192.168.1.38',

    'port' => '11100',

    'event' => 'WSevent',

//    'push'=>'sendWsMessage',

    'log' => 'true',

);


$websocket = new Websocket($config);

//$websocket->sendWsMessage('qwqwqewqew');

$websocket->run();


$ws_func = new ws_function();


//function WSevent($type,$event){

//    global $websocket;

//    switch ($type){

//        case 'register':

//            $websocket->log('客户端注册标识:'.$event['hardwareKey']);

//            break;

//        case 'heartbeat':

//            $websocket->log('监听心跳包:'.$event['heartbeat']);

//            break;

//                

//            

//    }

//}


function WSevent($type, $event,$is_activesend =0) {

    global $websocket;

    if ('in' == $type) {

        $websocket->log('Client Enter Id:' . $event['k']);

    } elseif ('out' == $type) {

        $websocket->log('Client Loginout Id:' . $event['k']);

    } elseif ('msg' == $type) {

        $websocket->log($event['k'] . 'Message:' . $event['msg']);

        roboot($event['sign'], $event['msg']);

    } elseif ('push' == $type) {

        $websocket->write($event['sign'], 'ClientPush Success');

    }

}


function roboot($sign, $t) {

    global $websocket;

    global $ws_func;

//$TxtFileName = "E:\WWW\Demo.txt";

//if( ($TxtRes=fopen ($TxtFileName,"w+")) === FALSE){

//echo("Create a writable file:".$TxtFileName."Failed");

//exit();

//}

//if(!fwrite ($TxtRes,$t)){ //将信息写入文件

////echo ("Try To Write Into".$TxtFileName."Write Into".$StrConents."Failed!");

//fclose($TxtRes);

//exit();

//}

//  echo ("write to file".$TxtFileName."Write Into".$t."SUCCESS!");

// fclose ($TxtRes); 


    $receive_data = json_decode($t, TRUE);

    switch ($receive_data['action']) {

        case 'login':

            if (!empty($receive_data)) {

                $hardwareKey = $receive_data['req']['hardwareKey'];

                $seq_id = intval($receive_data['seq_id']);

                if (empty($hardwareKey)) {

                    $error_msg = array(

                        "action" => 'errorLog',

                        "req" => array(

                            "code" => '101',

                            "status" => 'hardwareKey Can Not Empty!'

                        ),

                    );

                    $error_msg_json = json_encode($error_msg);

                    $show = $error_msg_json;

                } else {

                    $data = array(

                        "action" => "login",

                        "req" => array(

                            "code" => "200",

                            "status" => "Succeess"

                        ),

                        "req_event" => 1,

                        "seq_id" => $seq_id,

                    );

                    $show = json_encode($data);

                }

            } else {

                $error_msg = array(

                    "action" => 'errorLog',

                    "req" => array(

                        "code" => '100',

                        "status" => 'Receive Json Data Can Not Empty!'

                    ),

                );

            }

            break;

        case 'name':

            $show = 'Robot';

            break;

        case 'time':

            $show = 'Now Time:' . date('Y-m-d H:i:s');

            break;

        default:

            $show = 'Robotting...';

    }

    $websocket->write($sign, $show);

}



/*websocket主动推送数据*/

function sendWsMessage_1($message){

//    $websocket->eventoutput('activePush', $eventreturn);

      $changes = $this->sockets;

       @socket_select($changes, $write = NULL, $except = NULL, NULL);

       foreach ($changes as $sign){

           //如果有新的client连接进来,则

      if($sock==$this->master){

  

//        接受一个socket连接

        $client=socket_accept($this->master);

  

        //给新连接进来的socket一个唯一的ID

        $key=uniqid();

        $this->sockets[]=$client; //将新连接进来的socket存进连接池

        $this->users[$key]=array(

          'socket'=>$client, //记录新连接进来client的socket信息

          'hand'=>false    //标志该socket资源没有完成握手

        );

      //否则1.为client断开socket连接,2.client发送信息

      }else{

        $len=0;

        $buffer='';

        //读取该socket的信息,注意:第二个参数是引用传参即接收数据,第三个参数是接收数据的长度

        do{

          $l=socket_recv($sock,$message,1000,0);

          $len+=$l;

          $buffer.=$message;

        }while($l==1000);

  

        //根据socket在user池里面查找相应的$k,即健ID

        $k=$this->search($sock);

        

        //如果接收的信息长度小于7,则该client的socket为断开连接

        if($len<7){

          //给该client的socket进行断开操作,并在$this->sockets和$this->users里面进行删除

        $this->close($k);  

          continue;

        }

        //判断该socket是否已经握手

        if(!$this->users[$k]['hand']){

          //如果没有握手,则进行握手处理

          $this->handshake($k,$buffer);

        }else{

          //走到这里就是该client发送信息了,对接受到的信息进行uncode处理

          $buffer = $this->uncode($buffer,$k);

          

          if($buffer==false){

            continue;

          }

          //如果不为空,则进行消息推送操作

          $this->write($k,$buffer);

       }

}

}

          }

?>

/*********************************************************************************/

原生第二版

服务端

<?php


//require_once('/lib/data/pdoconnect.class.php');

//下面是sock类

class Sock {


    public $sockets; //socket的连接池,即client连接进来的socket标志

    public $users;   //所有client连接进来的信息,包括socket、client名字等

    public $master;  //socket的resource,即前期初始化socket时返回的socket资源

    private $sda = array();   //已接收的数据

    private $slen = array();  //数据总长度

    private $sjen = array();  //接收数据的长度

    private $ar = array();    //加密key

    private $n = array();


    public function __construct($address, $port) {


        //创建socket并把保存socket资源在$this->master

        $this->master = $this->WebSocket($address, $port);

//        var_dump($this->master);

//       print_r('000000'.$this->master."000000");

        //创建socket连接池

        $this->sockets = array($this->master);

//        var_dump($this->sockets);

//        if (substr(php_sapi_name(), 0, 3) !== 'cli') {

//            die("请通过命令行模式运行!");

//            $pdo = new PDO("mysql:host=192.168.1.50;dbname=lonbon_distribute", "root", "lonbon");

//            $stmt = $pdo->prepare("DELETE FROM sock");

//        }

    }


    function run() {

        while (true) {

            $changes = $this->sockets; //$changes由多变1,但$this->sockets却只是稳定的+1;  

            var_dump($changes);

            $write = NULL;

            $except = NULL;

            $rr = socket_select($changes, $write, $except, NULL);

            foreach ($changes as $sock) {

                //连接主机的client  

                if ($sock == $this->master) { //---此处只用来存数据了  

                    $client = socket_accept($this->master); //resource(3, Socket)。表示接受请求,并创建一个子链接!!  

                    //var_dump($client);  

                    //exit;  

//                    $key=uniqid();  

                    $this->sockets[] = $client;

                    $this->users[] = array(

                        'socket' => $client,

                        'shou' => false

                    );

                } else { //---此处服务器与客户端发信息  

                    $len = 0;

                    $buffer = '';

                    do {

                        $l = socket_recv($sock, $buf, 20480, 0); //原来取数据是一个缓慢的过程,要一次一次取数据,并计算每次buf的长度,让总长度不超过设定值  

                        $len+=$l;

                        $buffer.=$buf;

//                        print_r($this->decode($buffer));

                    } while ($l == 20480);

                    $k = $this->search($sock); //跟据sock返回key值  

                    var_dump('The Key Value Is ' . $k);

                    $receive_data = json_decode($this->decode($buffer), true);

//                                      var_dump($receive_data);

                    $hardwareKey = $receive_data['req']['hardwareKey'];

                    if (!empty($hardwareKey)) {

                        $user_arr = array(

                            'k' => $k,

                            'hardwareKey' => $hardwareKey

                        );

//                        var_dump($user_arr);

////                    var_dump($user_arr);

                        $pdo = new PDO("mysql:host=192.168.1.50;dbname=lonbon_distribute", "root", "lonbon");

                        $sth = $pdo->prepare('INSERT INTO sock (ws_id,hardwareKey) VALUES (:ws_id, :hardwareKey)');

                        $params = array(

                            'ws_id' => $user_arr['k'],

                            'hardwareKey' => $user_arr['hardwareKey']

                        );

                        $sth->execute($params);

//                      $socket_id = $dbh->lastInsertId();

                    }

                    if ($len < 7) { //发过来的消息太短了,系统就判断 断了,断掉链接。  

                        $this->send2($k); //用户退出。1关闭这个$key值对应的socket、删除这条key记录。将sockets数组对象重新排列。  

                        //2  

                        continue;

                    }

                    if (!$this->users[$k]['shou']) {//判断用户的握手字段是true?否则重新握手。  

                        $this->woshou($k, $buffer);

                        //file_put_contents('lz.text','woshou', FILE_APPEND);  

                    } else {  //如果用户已经握手,则与用户之间进行通信。终于可以发消息了!  

                        $buffer = $this->decode($buffer, $k); //返编译  

                        if ($buffer == false) {

                            continue;

                        }

                        $buffer_n = json_decode($buffer, true);

                        $action = $buffer_n['action'];

                        if ($action == 'login') {

                            $this->send($k, $buffer);

                        } elseif ($action == 'push') {

                            var_dump("ACTIVE PUSH MODULE GO");

                            $this->s_send($k, $msg);

                        }

                    }

                }

            }

        }

    }



    //指定关闭$k对应的socket

    function close($k) {

        //断开相应socket

        socket_close($this->users[$k]['socket']);

        //删除相应的user信息

        unset($this->users[$k]);

        //重新定义sockets连接池

        $this->sockets = array($this->master);

        foreach ($this->users as $v) {

            $this->sockets[] = $v['socket'];

        }

        //输出日志

        $this->e("key:$k close");

    }


    //根据sock在users里面查找相应的$k

    function search($sock) {

        foreach ($this->users as $k => $v) {

            if ($sock == $v['socket'])

                return $k;

        }

        return false;

    }


    //根据sock在users里面查找相应的$k

    function search_1($sock) {

        foreach ($this->users as $k => $v) {

            if ($sock == $v['socket'])

                return $v['socket'];

        }

        return false;

    }


    //传相应的IP与端口进行创建socket操作

    function WebSocket($address, $port) {

        $server = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);

        socket_set_option($server, SOL_SOCKET, SO_REUSEADDR, 1); //1表示接受所有的数据包

        socket_bind($server, $address, $port);

        socket_listen($server);

//        $this->e('Server Started : ' . date('Y-m-d H:i:s'));

//        $this->e('Listening on   : ' . $address . ' port ' . $port);

        return $server;

    }


    /*

     * 函数说明:对client的请求进行回应,即握手操作

     * @$k clien的socket对应的健,即每个用户有唯一$k并对应socket

     * @$buffer 接收client请求的所有信息

     */


    function woshou($k, $buffer) {


        //截取Sec-WebSocket-Key的值并加密,其中$key后面的一部分258EAFA5-E914-47DA-95CA-C5AB0DC85B11字符串应该是固定的

        $buf = substr($buffer, strpos($buffer, 'Sec-WebSocket-Key:') + 18);

        $key = trim(substr($buf, 0, strpos($buf, "\r\n")));

        $new_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));


        //按照协议组合信息进行返回

        $new_message = "HTTP/1.1 101 Switching Protocols\r\n";

        $new_message .= "Upgrade: websocket\r\n";

        $new_message .= "Sec-WebSocket-Version: 13\r\n";

        $new_message .= "Connection: Upgrade\r\n";

        $new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";

        socket_write($this->users[$k]['socket'], $new_message, strlen($new_message));


        //对已经握手的client做标志

        $this->users[$k]['shou'] = true;

        return true;

    }


    //解码函数

    function uncode($str, $key) {

        $mask = array();

        $data = '';

        $msg = unpack('H*', $str);

        $head = substr($msg[1], 0, 2);

        if ($head == '81' && !isset($this->slen[$key])) {

            $len = substr($msg[1], 2, 2);

            $len = hexdec($len); //把十六进制的转换为十进制

            if (substr($msg[1], 2, 2) == 'fe') {

                $len = substr($msg[1], 4, 4);

                $len = hexdec($len);

                $msg[1] = substr($msg[1], 4);

            } else if (substr($msg[1], 2, 2) == 'ff') {

                $len = substr($msg[1], 4, 16);

                $len = hexdec($len);

                $msg[1] = substr($msg[1], 16);

            }

            $mask[] = hexdec(substr($msg[1], 4, 2));

            $mask[] = hexdec(substr($msg[1], 6, 2));

            $mask[] = hexdec(substr($msg[1], 8, 2));

            $mask[] = hexdec(substr($msg[1], 10, 2));

            $s = 12;

            $n = 0;

        } else if ($this->slen[$key] > 0) {

            $len = $this->slen[$key];

            $mask = $this->ar[$key];

            $n = $this->n[$key];

            $s = 0;

        }


        $e = strlen($msg[1]) - 2;

        for ($i = $s; $i <= $e; $i+= 2) {

            $data .= chr($mask[$n % 4] ^ hexdec(substr($msg[1], $i, 2)));

            $n++;

        }

        $dlen = strlen($data);


        if ($len > 255 && $len > $dlen + intval($this->sjen[$key])) {

            $this->ar[$key] = $mask;

            $this->slen[$key] = $len;

            $this->sjen[$key] = $dlen + intval($this->sjen[$key]);

            $this->sda[$key] = $this->sda[$key] . $data;

            $this->n[$key] = $n;

            return false;

        } else {

            unset($this->ar[$key], $this->slen[$key], $this->sjen[$key], $this->n[$key]);

            $data = $this->sda[$key] . $data;

            unset($this->sda[$key]);

            return $data;

        }

    }


    //与uncode相对

    function code($msg) {

        $frame = array();

        $frame[0] = '81';

        $len = strlen($msg);

        if ($len < 126) {

            $frame[1] = $len < 16 ? '0' . dechex($len) : dechex($len);

        } else if ($len < 65025) {

            $s = dechex($len);

            $frame[1] = '7e' . str_repeat('0', 4 - strlen($s)) . $s;

        } else {

            $s = dechex($len);

            $frame[1] = '7f' . str_repeat('0', 16 - strlen($s)) . $s;

        }

        $frame[2] = $this->ord_hex($msg);

        $data = implode('', $frame);

        return pack("H*", $data);

    }


    /**     * 打包     * @param string $buffer     */

    function encode($buffer) {

        $len = strlen($buffer);

        if ($len <= 125) {

            return "\x81" . chr($len) . $buffer;

        } else if ($len <= 65535) {

            return "\x81" . chr(126) . pack("n", $len) . $buffer;

        } else {

            //pack("xxxN", $len)pack函数只处理2的32次方大小的文件,实际上2的32次方已经4G了。 

            return "\x81" . char(127) . pack("xxxxN", $len) . $buffer;

        }

    }


    /**     * 解包     * @param string $buffer     * @return string     */

    function decode($buffer) {

        $len = $masks = $data = $decoded = null;

        $len = ord($buffer[1]) & 127;

        if ($len === 126) {

            $masks = substr($buffer, 4, 4);

            $data = substr($buffer, 8);

        } else if ($len === 127) {

            $masks = substr($buffer, 10, 4);

            $data = substr($buffer, 14);

        } else {

            $masks = substr($buffer, 2, 4);

            $data = substr($buffer, 6);

        } for ($index = 0; $index < strlen($data); $index++) {

            $decoded .= $data[$index] ^ $masks[$index % 4];

        } return $decoded;

    }


    function ord_hex($data) {

        $msg = '';

        $l = strlen($data);

        for ($i = 0; $i < $l; $i++) {

            $msg .= dechex(ord($data{$i}));

        }

        return $msg;

    }

    //用户加入  

    function send($k, $msg) {

        parse_str($msg, $g); //把查询字符串解析到变量中  

        $g1 = json_decode($json);

        var_dump($g1);

        $ar = array();

        if ($g['type'] == 'add') {

            $this->users[$k]['name'] = $g['ming'];

            $ar['type'] = 'add';

            $ar['name'] = $g['ming'];

            $key = 'all';

        } else {

//            $ar['nrong'] = $g['nr'];

            $key = $g['key'];

        }

        $receive_data = json_decode($msg, true);

        $seq_id = intval($receive_data['seq_id']);

        $this->send1($k, $ar, $key, $seq_id);

    }


    //用户加入  

    function s_send($k, $type) {

        if ($type == 'push') {

//            parse_str($msg, $g); //把查询字符串解析到变量中  

            $ar = array();

            if ($g['type'] == 'add') {

                $this->users[$k]['name'] = $g['ming'];

                $ar['type'] = 'add';

                $ar['name'] = $g['ming'];

                $key = 'all';

            } else {

//            $ar['nrong'] = $g['nr'];

                $key = $g['key'];

            }

            $receive_data = json_decode($msg, true);

            $seq_id = intval($receive_data['seq_id']);

            $this->send3($k, $ar, $key, $seq_id);

        }else{

            return false;

        }

    }


    //对新加入的client推送已经在线的client

    function getusers() {

        $ar = array();

        foreach ($this->users as $k => $v) {

            $ar[] = array('code' => $k, 'name' => $v['name']);

        }

        return $ar;

    }


    function send3($k, $ar, $key = 'all', $seq_id) {

        $data = array(

            "action" => "recmsg",

            "req" => array(

                "Command" => "update"

            ),

            "req_event" => 1,

            "seq_id" => $seq_id,

        );


        $ar['action'] = $data["action"];

        $ar["req"] = $data["req"];

        $ar["req_event"] = $data["req_event"];

        $ar["seq_id"] = $data["seq_id"];

//        $ar['code'] = $k;

//        $ar['time'] = date('m-d H:i:s');

        //对发送信息进行编码处理

        $str = $this->encode(json_encode($ar));

        //面对大家即所有在线者发送信息

        if ($key == 'all') {

            $users = $this->users;

            //如果是add表示新加的client

            if ($ar['type'] == 'push') {

                $ar['type'] = 'mpush';

                $ar['users'] = $this->getusers();        //取出所有在线者,用于显示在在线用户列表中

                $str1 = $this->code(json_encode($ar)); //单独对新client进行编码处理,数据不一样

                var_dump($str1);

                //对新client自己单独发送,因为有些数据是不一样的

                socket_write($users[$k]['socket'], $str1, strlen($str1));

                //上面已经对client自己单独发送的,后面就无需再次发送,故unset

//                unset($users[$k]);

            }

            //除了新client外,对其他client进行发送信息。数据量大时,就要考虑延时等问题了

            foreach ($users as $v) {

                socket_write($v['socket'], $str, strlen($str));

            }

        } else {

            //单独对个人发送信息,即双方聊天

            socket_write($this->users[$k]['socket'], $str, strlen($str));

            socket_write($this->users[$key]['socket'], $str, strlen($str));

        }

    }


    function send1($k, $ar, $key = 'all', $seq_id) {

        $data = array(

            "action" => "login",

            "req" => array(

                "code" => "200",

                "status" => "Succeess"

            ),

            "req_event" => 1,

            "seq_id" => $seq_id,

        );


        $ar['action'] = $data["action"];

        $ar["req"] = $data["req"];

        $ar["req_event"] = $data["req_event"];

        $ar["seq_id"] = $data["seq_id"];

//        $ar['code'] = $k;

//        $ar['time'] = date('m-d H:i:s');

        //对发送信息进行编码处理

        $str = $this->encode(json_encode($ar));

        //面对大家即所有在线者发送信息

        if ($key == 'all') {

            $users = $this->users;

            //如果是add表示新加的client

            if ($ar['type'] == 'add') {

                $ar['type'] = 'madd';

                $ar['users'] = $this->getusers();        //取出所有在线者,用于显示在在线用户列表中

                $str1 = $this->code(json_encode($ar)); //单独对新client进行编码处理,数据不一样

                //对新client自己单独发送,因为有些数据是不一样的

                socket_write($users[$k]['socket'], $str1, strlen($str1));

                //上面已经对client自己单独发送的,后面就无需再次发送,故unset

//                unset($users[$k]);

            } elseif ($ar['type'] == 'push') {

                $ar['type'] = 'mpush';

                $ar['users'] = $this->getusers();

                $str2 = $this->$this->code(json_encode($ar));

                socket_write($users[$k]['socket'], $str1, strlen($str2));

            }

            //除了新client外,对其他client进行发送信息。数据量大时,就要考虑延时等问题了

            foreach ($users as $v) {

                socket_write($v['socket'], $str, strlen($str));

            }

        } else {

            //单独对个人发送信息,即双方聊天

            socket_write($this->users[$k]['socket'], $str, strlen($str));

            socket_write($this->users[$key]['socket'], $str, strlen($str));

        }

    }


    //用户退出向所用client推送信息

    function send2($k) {

        $this->close($k);

        $ar['type'] = 'rmove';

        $ar['nrong'] = $k;

        $this->send1(false, $ar, 'all');

    }


    //记录日志

    function e($str, $data) {

        //$path=dirname(__FILE__).'/log.txt';

        $str = $str . "\n";

        //error_log($str,3,$path);

        //编码处理

        echo iconv('utf-8', 'gbk//IGNORE', $str);

    }


    function bd_send($sign, $t) {

        $receive_data = json_decode($t, TRUE);

        switch ($receive_data['action']) {

            case 'login':

                if (!empty($receive_data)) {

                    $hardwareKey = $receive_data['req']['hardwareKey'];

                    $seq_id = intval($receive_data['seq_id']);

                    print_r("receive SeQ_ID IS " . $hardwareKey);

                    if (empty($hardwareKey)) {

                        $error_msg = array(

                            "action" => 'errorLog',

                            "req" => array(

                                "code" => '101',

                                "status" => 'hardwareKey Can Not Empty!'

                            ),

                        );

                        $error_msg_json = json_encode($error_msg);

                        $show = $error_msg_json;

                    } else {

                        $data = array(

                            "action" => "login",

                            "req" => array(

                                "code" => "200",

                                "status" => "Succeess"

                            ),

                            "req_event" => 1,

                            "seq_id" => $seq_id,

                        );

                        $show = json_encode($data);

                        var_dump('RETURN DATA IS' . $show);

                    }

                } else {

                    $error_msg = array(

                        "action" => 'errorLog',

                        "req" => array(

                            "code" => '100',

                            "status" => 'Receive Json Data Can Not Empty!'

                        ),

                    );

                }

                break;

            case 'name':

                $show = 'Robot';

                break;

            case 'time':

                $show = 'Now Time:' . date('Y-m-d H:i:s');

                break;

            default:

                $data = array(

                    "action" => "login",

                    "req" => array(

                        "code" => "200",

                        "status" => "Succeess"

                    ),

                    "req_event" => 1,

                    "seq_id" => $seq_id,

                );

                $show = json_encode($data);

        }

        $this->write($sign, $show);

    }


    function write($k, $t) {//通过标示推送信息

//        var_dump('RECEIVE WRITE k IS: ' . $k);

//        $t = $this->decode($t);

        var_dump('Send Data Is: ' . $t);

        return socket_write($k, $t, strlen($t));

    }


}


?>

/**********************************************************************************/

入口文件

<?php 

require('lib/app/sock.class.php');

//require('lib/data/display.data.class.php');

error_reporting(E_ALL ^ E_NOTICE);

ob_implicit_flush();

//require_once('../data/data.class.php');

//地址与接口,即创建socket时需要服务器的IP和端口

$sk = new Sock('192.168.1.38', 11100);

//对创建的socket循环进行监听,处理数据

$sk->run();








        在五月天的倔强中我改用了网上的swoole+redis的思路,为什么不钻了?因为直觉是错觉啊o(╥﹏╥)o。尝试下是可以实现主推的(参考http://blog.csdn.net/yeshencat/article/details/72900895),但思路不一样的是把服务端的项目整体看做客户端,实现了整体推送,就swoole+redis给了思路,用swoole+mysql肯定也可以实现。经过一周的爬坑,将swoole和redis看入门了,毕竟之前原生弄了那么久o(╥﹏╥)o。只剩下如何和当前项目关联了,主动推送实现了不过是广播,关于业务中关联指定的客户端id和对应的websocket唯一标识这块也实现了,剩下部分就是关联客户端id带入推送了。

声明:本文内容转载自脚本之家,由网友自发贡献,版权归原作者所有,如您发现涉嫌抄袭侵权,请联系admin@php.cn 核实处理。
全部评论
文明上网理性发言,请遵守新闻评论服务协议