关于websocket是基于tcp协议的长链接协议,一个多月时间我脑细胞死了不知道多少,再加上php本身的坑让我水生火热的煎熬了许久。一开始用原生写了websocket服务端几个版本无一例外在和当前项目衔接过程中短路了,自己石乐志一般以为php可以实现主推。撞破了南墙还往前跑,最后问某某群中的大神们,回答我一个残酷的事实,直觉往往是错觉,无奈之下从头开始用框架,swoole开始继续爬坑、下面是我爬坑的原生几个websocket版本。
服务端代码:
<?php
/* * *****websocket类库第n版****** */
class websocket {
public $log;
public $event;
public $signets;
public $users;
public $master;
public $push;
public function __construct($config) {
// if (substr(php_sapi_name(), 0, 3) !== 'cli') {
// die("请通过命令行模式运行!");
// }
error_reporting(E_ALL);
set_time_limit(0);
ob_implicit_flush();
$this->event = $config['event'];
$this->push = $config['push'];
$this->log = $config['log'];
$this->master = $this->WebSocket($config['address'], $config['port']);
$this->sockets = array('s' => $this->master);
}
function WebSocket($address, $port) {
$server = $this->soc = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
// $this->soc =$this->createSocket($address,$port);
$this->socs = array($this->soc);
socket_set_option($server, SOL_SOCKET, SO_REUSEADDR, 1);
socket_bind($server, $address, $port);
socket_listen($server);
$this->log('Start Listenning: ' . $address . ' : ' . $port);
return $server;
}
function run() {
while (true) {
$changes = $this->sockets;
@socket_select($changes, $write = NULL, $except = NULL, NULL);
foreach ($changes as $sign) {
if ($sign == $this->master) {
$client = socket_accept($this->master);
$this->sockets[] = $client;
$user = array(
'socket' => $client,
'hand' => false,
);
$this->users[] = $user;
$k = $this->search($client);
$eventreturn = array('k' => $k, 'sign' => $sign);
$this->eventoutput('in', $eventreturn);
} else {
$len = socket_recv($sign, $buffer, 20480, 0);
// $receive_str = '接收的客户端传输字节长度为';
// print_r(iconv("UTF-8", "GBK", $receive_str) . $len);
$k = $this->search($sign);
$user = $this->users[$k];
// var_dump('The Value Of user is ' . $client);
//写入数据库
$receive_data = json_decode($this->decode($buffer), true);
// var_dump($receive_data);
$hardwareKey = $receive_data['req']['hardwareKey'];
var_dump('HardwareKey is ' . $hardwareKey);
if (!empty($hardwareKey)) {
$user_arr = array(
'k' => $k,
'hardwareKey' => $hardwareKey
);
// var_dump($user_arr);
//// var_dump($user_arr);
$pdo = new PDO("mysql:host=192.168.1.50;dbname=lonbon_distribute", "root", "lonbon");
$sth = $pdo->prepare('INSERT INTO sock (ws_id,hardwareKey) VALUES (:ws_id, :hardwareKey)');
$params = array(
'ws_id' => $user_arr['k'],
'hardwareKey' => $user_arr['hardwareKey'],
);
$sth->execute($params);
}
//如果接受的是0字节
if ($len < 7) {
$this->close($sign);
$eventreturn = array('k' => $k, 'sign' => $sign);
$this->eventoutput('out', $eventreturn);
continue;
}
if (!$this->users[$k]['hand']) {//没有握手进行握手
$this->handshake($k, $buffer);
} else {
$buffer = $this->decode($buffer); //存储客户端传输字节
$eventreturn = array('k' => $k, 'sign' => $sign, 'msg' => $buffer);
$this->eventoutput('msg', $eventreturn);
// $this->pushoutput('activePush', $eventreturn);
// $this->write($k,'addasdas');
// $this->idwrite($sign, 'Register Message Success');
}
}
}
}
}
/* * ****广播消息第n版***** */
function broadcast($data) {
foreach ($this->sockets as $socket) {
if ($socket['resource'] == $this->master) {
continue;
}
$this->write($socket['resource'], $data);
}
}
function search($sign) {//通过标示遍历获取id
foreach ($this->users as $k => $v) {
if ($sign == $v['socket'])
return $k;
}
return false;
}
function close($sign) {//通过标示断开连接
$k = array_search($sign, $this->sockets);
socket_close($sign);
unset($this->sockets[$k]);
unset($this->users[$k]);
}
function handshake($k, $buffer) {
$buf = substr($buffer, strpos($buffer, 'Sec-WebSocket-Key:') + 18);
$key = trim(substr($buf, 0, strpos($buf, "\r\n")));
$new_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
$new_message = "HTTP/1.1 101 Switching Protocols\r\n";
$new_message .= "Upgrade: websocket\r\n";
$new_message .= "Sec-WebSocket-Version: 13\r\n";
$new_message .= "Connection: Upgrade\r\n";
$new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
socket_write($this->users[$k]['socket'], $new_message, strlen($new_message));
$this->users[$k]['hand'] = true;
return true;
}
//解包
function uncode($str) {
$mask = array();
$data = '';
$msg = unpack('H*', $str);
$head = substr($msg[1], 0, 2);
if (hexdec($head{1}) === 8) {
$data = false;
} else if (hexdec($head{1}) === 1) {
$mask[] = hexdec(substr($msg[1], 4, 2));
$mask[] = hexdec(substr($msg[1], 6, 2));
$mask[] = hexdec(substr($msg[1], 8, 2));
$mask[] = hexdec(substr($msg[1], 10, 2));
$s = 12;
$e = strlen($msg[1]) - 2;
$n = 0;
for ($i = $s; $i <= $e; $i+= 2) {
$data .= chr($mask[$n % 4] ^ hexdec(substr($msg[1], $i, 2)));
$n++;
}
//发送数据到客户端
//如果长度大于125 将数据分块
$block = str_split($data, 1024);
$mess = array(
'mess' => $block[0],
);
// return json_encode($mess,TRUE);
}
// return $data;
return json_encode($mess, TRUE);
}
//*************************/
/** * 打包 * @param string $buffer */
function encode($buffer) {
$len = strlen($buffer);
if ($len <= 125) {
return "\x81" . chr($len) . $buffer;
} else if ($len <= 65535) {
return "\x81" . chr(126) . pack("n", $len) . $buffer;
} else {
//pack("xxxN", $len)pack函数只处理2的32次方大小的文件,实际上2的32次方已经4G了。
return "\x81" . char(127) . pack("xxxxN", $len) . $buffer;
}
}
/** * 解包 * @param string $buffer * @return string */
function decode($buffer) {
$len = $masks = $data = $decoded = null;
$len = ord($buffer[1]) & 127;
if ($len === 126) {
$masks = substr($buffer, 4, 4);
$data = substr($buffer, 8);
} else if ($len === 127) {
$masks = substr($buffer, 10, 4);
$data = substr($buffer, 14);
} else {
$masks = substr($buffer, 2, 4);
$data = substr($buffer, 6);
} for ($index = 0; $index < strlen($data); $index++) {
$decoded .= $data[$index] ^ $masks[$index % 4];
} return $decoded;
}
function ord_hex($data) {
$msg = '';
$l = strlen($data);
for ($i = 0; $i < $l; $i++) {
$msg .= dechex(ord($data{$i}));
}
return $msg;
}
function idwrite($id, $t) {//通过id推送信息
if (!$this->users[$id]['socket']) {
return false;
}//没有这个标示
$t = $this->decode($t);
return socket_write($this->users[$id]['socket'], $t, strlen($t));
}
function write($k, $t) {//通过标示推送信息
$t = $this->encode($t);
return socket_write($k, $t, strlen($t));
}
function eventoutput($type, $event) {//事件回调
call_user_func($this->event, $type, $event);
}
function pushoutput($type, $event) {
//推送回调
call_user_func($this->push, $type, $event);
}
function log($t) {//控制台输出
if ($this->log) {
$t = $t . "\r\n";
fwrite(STDOUT, iconv('utf-8', 'gbk//IGNORE', $t));
}
}
public function hands($client, $buff, $v) {
//提取websocket传的key并进行加密 (这是固定的握手机制获取Sec-WebSocket-Key:里面的key)
$buf = substr($buff, strpos($buff, 'Sec-WebSocket-Key:') + 18);
//去除换行空格字符
$key = trim(substr($buf, 0, strpos($buf, "\r\n")));
//固定的加密算法
$new_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
$new_message = "HTTP/1.1 101 Switching Protocols\r\n";
$new_message .= "Upgrade: websocket\r\n";
$new_message .= "Sec-WebSocket-Version: 13\r\n";
$new_message .= "Connection: Upgrade\r\n";
$new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
//将套接字写入缓冲区
socket_write($v, $new_message, strlen($new_message));
// socket_write(socket,$upgrade.chr(0), strlen($upgrade.chr(0)));
//标记此套接字握手成功
$this->hand[(int) $client] = true;
}
/* websocket主动推送数据 */
function sendWsMessage($message) {
$changes = $this->sockets;
@socket_select($changes, $write = NULL, $except = NULL, NULL);
foreach ($changes as $sign) {
//如果有新的client连接进来,则
if ($sock == $this->master) {
//接受一个socket连接
$client = socket_accept($this->master);
//给新连接进来的socket一个唯一的ID
$key = uniqid();
$this->sockets[] = $client; //将新连接进来的socket存进连接池
$this->users[$key] = array(
'socket' => $client, //记录新连接进来client的socket信息
'hand' => false //标志该socket资源没有完成握手
);
//否则1.为client断开socket连接,2.client发送信息
} else {
$len = 0;
$buffer = '';
//读取该socket的信息,注意:第二个参数是引用传参即接收数据,第三个参数是接收数据的长度
do {
$l = socket_recv($sock, $buffer, 1000, 0);
$len+=$l;
$buffer.=$message;
} while ($l == 1000);
//根据socket在user池里面查找相应的$k,即健ID
$k = $this->search($sock);
//如果接收的信息长度小于7,则该client的socket为断开连接
if ($len < 7) {
//给该client的socket进行断开操作,并在$this->sockets和$this->users里面进行删除
$this->close($k);
//$this->send2($k);
continue;
}
//判断该socket是否已经握手
if (!$this->users[$k]['hand']) {
//如果没有握手,则进行握手处理
$this->handshake($k, $buffer);
} else {
//走到这里就是该client发送信息了,对接受到的信息进行uncode处理
$buffer = $this->uncode($buffer, $k);
if ($buffer == false) {
continue;
}
//如果不为空,则进行消息推送操作
$this->write($k, $buffer);
}
}
}
}
function getusers() {
$ar = array();
foreach ($this->users as $k => $v) {
$ar[] = array('code' => $k, 'name' => $v['name']);
}
return $ar;
}
}
/*****************************************************************/
入口文件ws_start.php
<?php
/**
* websocket 入口文件
* 实现功能及项目总结
* 1.接收客户端注册websocket的相关接口
* 2.心跳包监听
* 3.推送消息和其他业务模块实现
* 4.新增配置项,优化结构
* 5.注意异常处理
*
* errorLog 错误 100 接收json数据错误
* 101 参数错误(hardWareKey)
* */
/* 定义入口文件 */
header("Content-type: text/html; charset=utf-8");
//header("Content-type: text/html; charset=gb2312");
define('DIR_WEBSOCKET', dirname(__FILE__));
require('lib/app/websocket.class.php');
$config = array(
'address' => '192.168.1.38',
'port' => '11100',
'event' => 'WSevent',
// 'push'=>'sendWsMessage',
'log' => 'true',
);
$websocket = new Websocket($config);
//$websocket->sendWsMessage('qwqwqewqew');
$websocket->run();
$ws_func = new ws_function();
//function WSevent($type,$event){
// global $websocket;
// switch ($type){
// case 'register':
// $websocket->log('客户端注册标识:'.$event['hardwareKey']);
// break;
// case 'heartbeat':
// $websocket->log('监听心跳包:'.$event['heartbeat']);
// break;
//
//
// }
//}
function WSevent($type, $event,$is_activesend =0) {
global $websocket;
if ('in' == $type) {
$websocket->log('Client Enter Id:' . $event['k']);
} elseif ('out' == $type) {
$websocket->log('Client Loginout Id:' . $event['k']);
} elseif ('msg' == $type) {
$websocket->log($event['k'] . 'Message:' . $event['msg']);
roboot($event['sign'], $event['msg']);
} elseif ('push' == $type) {
$websocket->write($event['sign'], 'ClientPush Success');
}
}
function roboot($sign, $t) {
global $websocket;
global $ws_func;
//$TxtFileName = "E:\WWW\Demo.txt";
//if( ($TxtRes=fopen ($TxtFileName,"w+")) === FALSE){
//echo("Create a writable file:".$TxtFileName."Failed");
//exit();
//}
//if(!fwrite ($TxtRes,$t)){ //将信息写入文件
////echo ("Try To Write Into".$TxtFileName."Write Into".$StrConents."Failed!");
//fclose($TxtRes);
//exit();
//}
// echo ("write to file".$TxtFileName."Write Into".$t."SUCCESS!");
// fclose ($TxtRes);
$receive_data = json_decode($t, TRUE);
switch ($receive_data['action']) {
case 'login':
if (!empty($receive_data)) {
$hardwareKey = $receive_data['req']['hardwareKey'];
$seq_id = intval($receive_data['seq_id']);
if (empty($hardwareKey)) {
$error_msg = array(
"action" => 'errorLog',
"req" => array(
"code" => '101',
"status" => 'hardwareKey Can Not Empty!'
),
);
$error_msg_json = json_encode($error_msg);
$show = $error_msg_json;
} else {
$data = array(
"action" => "login",
"req" => array(
"code" => "200",
"status" => "Succeess"
),
"req_event" => 1,
"seq_id" => $seq_id,
);
$show = json_encode($data);
}
} else {
$error_msg = array(
"action" => 'errorLog',
"req" => array(
"code" => '100',
"status" => 'Receive Json Data Can Not Empty!'
),
);
}
break;
case 'name':
$show = 'Robot';
break;
case 'time':
$show = 'Now Time:' . date('Y-m-d H:i:s');
break;
default:
$show = 'Robotting...';
}
$websocket->write($sign, $show);
}
/*websocket主动推送数据*/
function sendWsMessage_1($message){
// $websocket->eventoutput('activePush', $eventreturn);
$changes = $this->sockets;
@socket_select($changes, $write = NULL, $except = NULL, NULL);
foreach ($changes as $sign){
//如果有新的client连接进来,则
if($sock==$this->master){
// 接受一个socket连接
$client=socket_accept($this->master);
//给新连接进来的socket一个唯一的ID
$key=uniqid();
$this->sockets[]=$client; //将新连接进来的socket存进连接池
$this->users[$key]=array(
'socket'=>$client, //记录新连接进来client的socket信息
'hand'=>false //标志该socket资源没有完成握手
);
//否则1.为client断开socket连接,2.client发送信息
}else{
$len=0;
$buffer='';
//读取该socket的信息,注意:第二个参数是引用传参即接收数据,第三个参数是接收数据的长度
do{
$l=socket_recv($sock,$message,1000,0);
$len+=$l;
$buffer.=$message;
}while($l==1000);
//根据socket在user池里面查找相应的$k,即健ID
$k=$this->search($sock);
//如果接收的信息长度小于7,则该client的socket为断开连接
if($len<7){
//给该client的socket进行断开操作,并在$this->sockets和$this->users里面进行删除
$this->close($k);
continue;
}
//判断该socket是否已经握手
if(!$this->users[$k]['hand']){
//如果没有握手,则进行握手处理
$this->handshake($k,$buffer);
}else{
//走到这里就是该client发送信息了,对接受到的信息进行uncode处理
$buffer = $this->uncode($buffer,$k);
if($buffer==false){
continue;
}
//如果不为空,则进行消息推送操作
$this->write($k,$buffer);
}
}
}
}
?>
/*********************************************************************************/
原生第二版
服务端
<?php
//require_once('/lib/data/pdoconnect.class.php');
//下面是sock类
class Sock {
public $sockets; //socket的连接池,即client连接进来的socket标志
public $users; //所有client连接进来的信息,包括socket、client名字等
public $master; //socket的resource,即前期初始化socket时返回的socket资源
private $sda = array(); //已接收的数据
private $slen = array(); //数据总长度
private $sjen = array(); //接收数据的长度
private $ar = array(); //加密key
private $n = array();
public function __construct($address, $port) {
//创建socket并把保存socket资源在$this->master
$this->master = $this->WebSocket($address, $port);
// var_dump($this->master);
// print_r('000000'.$this->master."000000");
//创建socket连接池
$this->sockets = array($this->master);
// var_dump($this->sockets);
// if (substr(php_sapi_name(), 0, 3) !== 'cli') {
// die("请通过命令行模式运行!");
// $pdo = new PDO("mysql:host=192.168.1.50;dbname=lonbon_distribute", "root", "lonbon");
// $stmt = $pdo->prepare("DELETE FROM sock");
// }
}
function run() {
while (true) {
$changes = $this->sockets; //$changes由多变1,但$this->sockets却只是稳定的+1;
var_dump($changes);
$write = NULL;
$except = NULL;
$rr = socket_select($changes, $write, $except, NULL);
foreach ($changes as $sock) {
//连接主机的client
if ($sock == $this->master) { //---此处只用来存数据了
$client = socket_accept($this->master); //resource(3, Socket)。表示接受请求,并创建一个子链接!!
//var_dump($client);
//exit;
// $key=uniqid();
$this->sockets[] = $client;
$this->users[] = array(
'socket' => $client,
'shou' => false
);
} else { //---此处服务器与客户端发信息
$len = 0;
$buffer = '';
do {
$l = socket_recv($sock, $buf, 20480, 0); //原来取数据是一个缓慢的过程,要一次一次取数据,并计算每次buf的长度,让总长度不超过设定值
$len+=$l;
$buffer.=$buf;
// print_r($this->decode($buffer));
} while ($l == 20480);
$k = $this->search($sock); //跟据sock返回key值
var_dump('The Key Value Is ' . $k);
$receive_data = json_decode($this->decode($buffer), true);
// var_dump($receive_data);
$hardwareKey = $receive_data['req']['hardwareKey'];
if (!empty($hardwareKey)) {
$user_arr = array(
'k' => $k,
'hardwareKey' => $hardwareKey
);
// var_dump($user_arr);
//// var_dump($user_arr);
$pdo = new PDO("mysql:host=192.168.1.50;dbname=lonbon_distribute", "root", "lonbon");
$sth = $pdo->prepare('INSERT INTO sock (ws_id,hardwareKey) VALUES (:ws_id, :hardwareKey)');
$params = array(
'ws_id' => $user_arr['k'],
'hardwareKey' => $user_arr['hardwareKey']
);
$sth->execute($params);
// $socket_id = $dbh->lastInsertId();
}
if ($len < 7) { //发过来的消息太短了,系统就判断 断了,断掉链接。
$this->send2($k); //用户退出。1关闭这个$key值对应的socket、删除这条key记录。将sockets数组对象重新排列。
//2
continue;
}
if (!$this->users[$k]['shou']) {//判断用户的握手字段是true?否则重新握手。
$this->woshou($k, $buffer);
//file_put_contents('lz.text','woshou', FILE_APPEND);
} else { //如果用户已经握手,则与用户之间进行通信。终于可以发消息了!
$buffer = $this->decode($buffer, $k); //返编译
if ($buffer == false) {
continue;
}
$buffer_n = json_decode($buffer, true);
$action = $buffer_n['action'];
if ($action == 'login') {
$this->send($k, $buffer);
} elseif ($action == 'push') {
var_dump("ACTIVE PUSH MODULE GO");
$this->s_send($k, $msg);
}
}
}
}
}
}
//指定关闭$k对应的socket
function close($k) {
//断开相应socket
socket_close($this->users[$k]['socket']);
//删除相应的user信息
unset($this->users[$k]);
//重新定义sockets连接池
$this->sockets = array($this->master);
foreach ($this->users as $v) {
$this->sockets[] = $v['socket'];
}
//输出日志
$this->e("key:$k close");
}
//根据sock在users里面查找相应的$k
function search($sock) {
foreach ($this->users as $k => $v) {
if ($sock == $v['socket'])
return $k;
}
return false;
}
//根据sock在users里面查找相应的$k
function search_1($sock) {
foreach ($this->users as $k => $v) {
if ($sock == $v['socket'])
return $v['socket'];
}
return false;
}
//传相应的IP与端口进行创建socket操作
function WebSocket($address, $port) {
$server = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
socket_set_option($server, SOL_SOCKET, SO_REUSEADDR, 1); //1表示接受所有的数据包
socket_bind($server, $address, $port);
socket_listen($server);
// $this->e('Server Started : ' . date('Y-m-d H:i:s'));
// $this->e('Listening on : ' . $address . ' port ' . $port);
return $server;
}
/*
* 函数说明:对client的请求进行回应,即握手操作
* @$k clien的socket对应的健,即每个用户有唯一$k并对应socket
* @$buffer 接收client请求的所有信息
*/
function woshou($k, $buffer) {
//截取Sec-WebSocket-Key的值并加密,其中$key后面的一部分258EAFA5-E914-47DA-95CA-C5AB0DC85B11字符串应该是固定的
$buf = substr($buffer, strpos($buffer, 'Sec-WebSocket-Key:') + 18);
$key = trim(substr($buf, 0, strpos($buf, "\r\n")));
$new_key = base64_encode(sha1($key . "258EAFA5-E914-47DA-95CA-C5AB0DC85B11", true));
//按照协议组合信息进行返回
$new_message = "HTTP/1.1 101 Switching Protocols\r\n";
$new_message .= "Upgrade: websocket\r\n";
$new_message .= "Sec-WebSocket-Version: 13\r\n";
$new_message .= "Connection: Upgrade\r\n";
$new_message .= "Sec-WebSocket-Accept: " . $new_key . "\r\n\r\n";
socket_write($this->users[$k]['socket'], $new_message, strlen($new_message));
//对已经握手的client做标志
$this->users[$k]['shou'] = true;
return true;
}
//解码函数
function uncode($str, $key) {
$mask = array();
$data = '';
$msg = unpack('H*', $str);
$head = substr($msg[1], 0, 2);
if ($head == '81' && !isset($this->slen[$key])) {
$len = substr($msg[1], 2, 2);
$len = hexdec($len); //把十六进制的转换为十进制
if (substr($msg[1], 2, 2) == 'fe') {
$len = substr($msg[1], 4, 4);
$len = hexdec($len);
$msg[1] = substr($msg[1], 4);
} else if (substr($msg[1], 2, 2) == 'ff') {
$len = substr($msg[1], 4, 16);
$len = hexdec($len);
$msg[1] = substr($msg[1], 16);
}
$mask[] = hexdec(substr($msg[1], 4, 2));
$mask[] = hexdec(substr($msg[1], 6, 2));
$mask[] = hexdec(substr($msg[1], 8, 2));
$mask[] = hexdec(substr($msg[1], 10, 2));
$s = 12;
$n = 0;
} else if ($this->slen[$key] > 0) {
$len = $this->slen[$key];
$mask = $this->ar[$key];
$n = $this->n[$key];
$s = 0;
}
$e = strlen($msg[1]) - 2;
for ($i = $s; $i <= $e; $i+= 2) {
$data .= chr($mask[$n % 4] ^ hexdec(substr($msg[1], $i, 2)));
$n++;
}
$dlen = strlen($data);
if ($len > 255 && $len > $dlen + intval($this->sjen[$key])) {
$this->ar[$key] = $mask;
$this->slen[$key] = $len;
$this->sjen[$key] = $dlen + intval($this->sjen[$key]);
$this->sda[$key] = $this->sda[$key] . $data;
$this->n[$key] = $n;
return false;
} else {
unset($this->ar[$key], $this->slen[$key], $this->sjen[$key], $this->n[$key]);
$data = $this->sda[$key] . $data;
unset($this->sda[$key]);
return $data;
}
}
//与uncode相对
function code($msg) {
$frame = array();
$frame[0] = '81';
$len = strlen($msg);
if ($len < 126) {
$frame[1] = $len < 16 ? '0' . dechex($len) : dechex($len);
} else if ($len < 65025) {
$s = dechex($len);
$frame[1] = '7e' . str_repeat('0', 4 - strlen($s)) . $s;
} else {
$s = dechex($len);
$frame[1] = '7f' . str_repeat('0', 16 - strlen($s)) . $s;
}
$frame[2] = $this->ord_hex($msg);
$data = implode('', $frame);
return pack("H*", $data);
}
/** * 打包 * @param string $buffer */
function encode($buffer) {
$len = strlen($buffer);
if ($len <= 125) {
return "\x81" . chr($len) . $buffer;
} else if ($len <= 65535) {
return "\x81" . chr(126) . pack("n", $len) . $buffer;
} else {
//pack("xxxN", $len)pack函数只处理2的32次方大小的文件,实际上2的32次方已经4G了。
return "\x81" . char(127) . pack("xxxxN", $len) . $buffer;
}
}
/** * 解包 * @param string $buffer * @return string */
function decode($buffer) {
$len = $masks = $data = $decoded = null;
$len = ord($buffer[1]) & 127;
if ($len === 126) {
$masks = substr($buffer, 4, 4);
$data = substr($buffer, 8);
} else if ($len === 127) {
$masks = substr($buffer, 10, 4);
$data = substr($buffer, 14);
} else {
$masks = substr($buffer, 2, 4);
$data = substr($buffer, 6);
} for ($index = 0; $index < strlen($data); $index++) {
$decoded .= $data[$index] ^ $masks[$index % 4];
} return $decoded;
}
function ord_hex($data) {
$msg = '';
$l = strlen($data);
for ($i = 0; $i < $l; $i++) {
$msg .= dechex(ord($data{$i}));
}
return $msg;
}
//用户加入
function send($k, $msg) {
parse_str($msg, $g); //把查询字符串解析到变量中
$g1 = json_decode($json);
var_dump($g1);
$ar = array();
if ($g['type'] == 'add') {
$this->users[$k]['name'] = $g['ming'];
$ar['type'] = 'add';
$ar['name'] = $g['ming'];
$key = 'all';
} else {
// $ar['nrong'] = $g['nr'];
$key = $g['key'];
}
$receive_data = json_decode($msg, true);
$seq_id = intval($receive_data['seq_id']);
$this->send1($k, $ar, $key, $seq_id);
}
//用户加入
function s_send($k, $type) {
if ($type == 'push') {
// parse_str($msg, $g); //把查询字符串解析到变量中
$ar = array();
if ($g['type'] == 'add') {
$this->users[$k]['name'] = $g['ming'];
$ar['type'] = 'add';
$ar['name'] = $g['ming'];
$key = 'all';
} else {
// $ar['nrong'] = $g['nr'];
$key = $g['key'];
}
$receive_data = json_decode($msg, true);
$seq_id = intval($receive_data['seq_id']);
$this->send3($k, $ar, $key, $seq_id);
}else{
return false;
}
}
//对新加入的client推送已经在线的client
function getusers() {
$ar = array();
foreach ($this->users as $k => $v) {
$ar[] = array('code' => $k, 'name' => $v['name']);
}
return $ar;
}
function send3($k, $ar, $key = 'all', $seq_id) {
$data = array(
"action" => "recmsg",
"req" => array(
"Command" => "update"
),
"req_event" => 1,
"seq_id" => $seq_id,
);
$ar['action'] = $data["action"];
$ar["req"] = $data["req"];
$ar["req_event"] = $data["req_event"];
$ar["seq_id"] = $data["seq_id"];
// $ar['code'] = $k;
// $ar['time'] = date('m-d H:i:s');
//对发送信息进行编码处理
$str = $this->encode(json_encode($ar));
//面对大家即所有在线者发送信息
if ($key == 'all') {
$users = $this->users;
//如果是add表示新加的client
if ($ar['type'] == 'push') {
$ar['type'] = 'mpush';
$ar['users'] = $this->getusers(); //取出所有在线者,用于显示在在线用户列表中
$str1 = $this->code(json_encode($ar)); //单独对新client进行编码处理,数据不一样
var_dump($str1);
//对新client自己单独发送,因为有些数据是不一样的
socket_write($users[$k]['socket'], $str1, strlen($str1));
//上面已经对client自己单独发送的,后面就无需再次发送,故unset
// unset($users[$k]);
}
//除了新client外,对其他client进行发送信息。数据量大时,就要考虑延时等问题了
foreach ($users as $v) {
socket_write($v['socket'], $str, strlen($str));
}
} else {
//单独对个人发送信息,即双方聊天
socket_write($this->users[$k]['socket'], $str, strlen($str));
socket_write($this->users[$key]['socket'], $str, strlen($str));
}
}
function send1($k, $ar, $key = 'all', $seq_id) {
$data = array(
"action" => "login",
"req" => array(
"code" => "200",
"status" => "Succeess"
),
"req_event" => 1,
"seq_id" => $seq_id,
);
$ar['action'] = $data["action"];
$ar["req"] = $data["req"];
$ar["req_event"] = $data["req_event"];
$ar["seq_id"] = $data["seq_id"];
// $ar['code'] = $k;
// $ar['time'] = date('m-d H:i:s');
//对发送信息进行编码处理
$str = $this->encode(json_encode($ar));
//面对大家即所有在线者发送信息
if ($key == 'all') {
$users = $this->users;
//如果是add表示新加的client
if ($ar['type'] == 'add') {
$ar['type'] = 'madd';
$ar['users'] = $this->getusers(); //取出所有在线者,用于显示在在线用户列表中
$str1 = $this->code(json_encode($ar)); //单独对新client进行编码处理,数据不一样
//对新client自己单独发送,因为有些数据是不一样的
socket_write($users[$k]['socket'], $str1, strlen($str1));
//上面已经对client自己单独发送的,后面就无需再次发送,故unset
// unset($users[$k]);
} elseif ($ar['type'] == 'push') {
$ar['type'] = 'mpush';
$ar['users'] = $this->getusers();
$str2 = $this->$this->code(json_encode($ar));
socket_write($users[$k]['socket'], $str1, strlen($str2));
}
//除了新client外,对其他client进行发送信息。数据量大时,就要考虑延时等问题了
foreach ($users as $v) {
socket_write($v['socket'], $str, strlen($str));
}
} else {
//单独对个人发送信息,即双方聊天
socket_write($this->users[$k]['socket'], $str, strlen($str));
socket_write($this->users[$key]['socket'], $str, strlen($str));
}
}
//用户退出向所用client推送信息
function send2($k) {
$this->close($k);
$ar['type'] = 'rmove';
$ar['nrong'] = $k;
$this->send1(false, $ar, 'all');
}
//记录日志
function e($str, $data) {
//$path=dirname(__FILE__).'/log.txt';
$str = $str . "\n";
//error_log($str,3,$path);
//编码处理
echo iconv('utf-8', 'gbk//IGNORE', $str);
}
function bd_send($sign, $t) {
$receive_data = json_decode($t, TRUE);
switch ($receive_data['action']) {
case 'login':
if (!empty($receive_data)) {
$hardwareKey = $receive_data['req']['hardwareKey'];
$seq_id = intval($receive_data['seq_id']);
print_r("receive SeQ_ID IS " . $hardwareKey);
if (empty($hardwareKey)) {
$error_msg = array(
"action" => 'errorLog',
"req" => array(
"code" => '101',
"status" => 'hardwareKey Can Not Empty!'
),
);
$error_msg_json = json_encode($error_msg);
$show = $error_msg_json;
} else {
$data = array(
"action" => "login",
"req" => array(
"code" => "200",
"status" => "Succeess"
),
"req_event" => 1,
"seq_id" => $seq_id,
);
$show = json_encode($data);
var_dump('RETURN DATA IS' . $show);
}
} else {
$error_msg = array(
"action" => 'errorLog',
"req" => array(
"code" => '100',
"status" => 'Receive Json Data Can Not Empty!'
),
);
}
break;
case 'name':
$show = 'Robot';
break;
case 'time':
$show = 'Now Time:' . date('Y-m-d H:i:s');
break;
default:
$data = array(
"action" => "login",
"req" => array(
"code" => "200",
"status" => "Succeess"
),
"req_event" => 1,
"seq_id" => $seq_id,
);
$show = json_encode($data);
}
$this->write($sign, $show);
}
function write($k, $t) {//通过标示推送信息
// var_dump('RECEIVE WRITE k IS: ' . $k);
// $t = $this->decode($t);
var_dump('Send Data Is: ' . $t);
return socket_write($k, $t, strlen($t));
}
}
?>
/**********************************************************************************/
入口文件
<?php
require('lib/app/sock.class.php');
//require('lib/data/display.data.class.php');
error_reporting(E_ALL ^ E_NOTICE);
ob_implicit_flush();
//require_once('../data/data.class.php');
//地址与接口,即创建socket时需要服务器的IP和端口
$sk = new Sock('192.168.1.38', 11100);
//对创建的socket循环进行监听,处理数据
$sk->run();
在五月天的倔强中我改用了网上的swoole+redis的思路,为什么不钻了?因为直觉是错觉啊o(╥﹏╥)o。尝试下是可以实现主推的(参考http://blog.csdn.net/yeshencat/article/details/72900895),但思路不一样的是把服务端的项目整体看做客户端,实现了整体推送,就swoole+redis给了思路,用swoole+mysql肯定也可以实现。经过一周的爬坑,将swoole和redis看入门了,毕竟之前原生弄了那么久o(╥﹏╥)o。只剩下如何和当前项目关联了,主动推送实现了不过是广播,关于业务中关联指定的客户端id和对应的websocket唯一标识这块也实现了,剩下部分就是关联客户端id带入推送了。