搜索
首页php框架Swooleswoole如何实现实时推送

swoole如何实现实时推送

Dec 09, 2019 am 09:14 AM
swoole

swoole如何实现实时推送

 swoole+Redis实现实时数据推送           (推荐学习: swoole视频教程

<?php
/**
 * ***************************************
 *            单进程保护                 *
 * ***************************************
 */
$phpSelf 			= realpath($_SERVER[&#39;PHP_SELF&#39;]);
$lockFile			= $phpSelf.&#39;.lock&#39;;
$lockFileHandle 	= fopen($lockFile, "w");
if ($lockFileHandle == false) {
	exit("Can not create lock file $lockFile\n");
}
if (!flock($lockFileHandle, LOCK_EX + LOCK_NB)) {
	exit(date("Y-m-d H:i:s")."Process already exist.\n");
}
 
/**
 * ***************************************
 *     进入程序,定义相关配置            *
 * ***************************************
 */
set_time_limit(0);
//socket会话的超时时间,根据业务场景设置,这里设置为永不超时
//如果设置了时间,则从socket建立=>传输=>关闭整个过程必须在定义的时间内完成,否则自动close该socket并抛出warning
ini_set(&#39;default_socket_timeout&#39;, -1);
$conf = array(
	&#39;listen&#39;  => array(&#39;host&#39; => &#39;0.0.0.0&#39;,&#39;port&#39; => &#39;8008&#39;),
	&#39;setting&#39; => array(
		//程序允许的最大连接数,用以设置server最大允许维持多少个TCP连接,超过该数量后,新连接将被拒绝,默认为ulimit -n的值,如果设置大于ulimit -n则强制重置为ulimit- n,如果确实需要设置超过ulimit -n的值,请修改系统值 vim /etc/security/limits.conf 修改nofile的值
		"max_conn"			=> 1024,
		//启用CPU亲和设置(在全异步非阻塞是可启用),在多核的服务器中,启用此特性会将swoole的reactor线程/worker进程绑定到固定的一个核上。可以避免进程/线程的运行时在多个核之间互相切换,提高CPU Cache的命中率,如何确定绑定在了哪个核上,请参考文档, 查看命令: taskset -p 进程id
		&#39;open_cpu_affinity&#39;	=> 0,
		//配置task进程数量,配置此参数后将会启用task功能。所以Server务必要注册onTask、onFinish2个事件回调函数。如果没有注册,服务器程序将无法启动.Task进程是同步阻塞的,配置方式与Worker同步模式一致。
		&#39;task_worker_num&#39;	=> 20,
		//设置task进程的最大任务数。一个task进程在处理完超过此数值的任务后将自动退出。这个参数是为了防止PHP进程内存溢出。如果不希望进程自动退出可以设置为0, 默认是0
		&#39;task_max_request&#39;	=> 1024, 
		//设置task的数据临时目录,在swoole_server中,如果投递的数据超过8192字节,将启用临时文件来保存数据。这里的task_tmpdir就是用来设置临时文件保存的位置。
		&#39;task_tmpdir&#39;		=> &#39;/tmp/&#39;,
		//worker进程数量,根据业务代码的模式作调整,全异步非阻塞可设置为CPU核数的1-4倍;同步阻塞,请参考文档调整
		&#39;worker_num&#39;		=> 8,
		//指定swoole错误日志文件
		&#39;log_file&#39; 			=> &#39;/tmp/log/log.txt&#39;,
		//SSL公钥和私钥的位置,启用wss必须在编译swoole时加入--enable-openssl选项
		&#39;ssl_cert_file&#39;		=> &#39;/usr/local/nginx/conf/server.cer&#39;,
		&#39;ssl_key_file&#39;		=> &#39;/usr/local/nginx/conf/server.key&#39;,
	),
);
 
/**
 * ***************************************
 *       初始化Redis连接                 *
 * ***************************************
 */
$redis = null;
$redis = new Redis();
$redis->connect(REDIS_HOST, REDIS_PORT);
$redis->auth(REDIS_PWD);
$GLOBALS[&#39;redis&#39;]=$redis;
 
/**
 * ***************************************
 *        脚本重启时,清除历史的数据     *
 * ***************************************
 */
$sArr = $redis->sMembers(REDIS_S_KEY);
if (!empty($sArr)) {
	foreach ((array)$sArr as $key => $sc) {
		$fdArr = $redis->sMembers(REDIS_S_FD.$sc);
		foreach ((array)$fdArr as $k => $fd) {
			$res1 = $redis->del(REDIS_FD_S.$fd);
		}
		$res2 = $redis->del(REDIS_S_FD.$sc);
	}
	$redis->del(REDIS_S_KEY);
}
$redis->del(REDIS_ZS_KEY);
 
/**
 * ***************************************
 *           绑定回调事件                *
 * ***************************************
 */
$ws = null;
//wss服务
$ws = new swoole_websocket_server($conf[&#39;listen&#39;][&#39;host&#39;], $conf[&#39;listen&#39;][&#39;port&#39;], SWOOLE_PROCESS, SWOOLE_SOCK_TCP | SWOOLE_SSL);
$ws->set($conf[&#39;setting&#39;]);
 
/**
 * Server启动在主进程的主线程回调此函数
 * 在此事件之前Swoole Server已进行了如下操作
 * 已创建了manager进程
 * 已创建了worker子进程
 * 已监听所有TCP/UDP端口
 * 已监听了定时器
 * 在onStart中创建的全局资源对象不能在worker进程中被使用,因为发生onStart调用时,worker进程已经创建好了。新创建的对象在主进程内,worker进程无法访问到此内存区域。因此全局对象创建的代码需要放置在swoole_server_start之前
 */
$ws->on(&#39;start&#39;, function ($ws) {
	swoole_set_process_name(PROCESS_NAME.&#39;_master&#39;);
});
 
/**
 * 与onStart回调在不同进程中并行执行的回调函数(不存在先后顺序)
 * @param: $ws swoole_websocket_server object
 * @param: $wid 创建该进程时swoole分配的id(不是进程id)
 * 注意点:
 * 1. 此事件在worker进程/task进程启动时发生。onWorkerStart/onStart是并发执行的,没有先后顺序,这里创建的对象可以在进程生命周期内使用
 * 2. swoole1.6.11之后task_worker中也会触发onWorkerStart,故而在下面的处理中,加入了判断业务类型$jobType是task还是work,如果是task则命名为****_Tasker_$id,如果是worker则命名为****_Worker_$id
 * 3. 发生PHP致命错误或者代码中主动调用exit时,Worker/Task进程会退出,管理进程会重新创建新的进程
 * 5. 如果想使用swoole_server_reload实现代码重载入,必须在workerStart中require你的业务文件,而不是在文件头部。在onWorkerStart调用之前已包含的文件,不会重新载入代码。
 * 6. 可以将公用的,不易变的php文件放置到onWorkerStart之前(例如上面的redis配置)。这样虽然不能重载入代码,但所有worker是共享的,不需要额外的内存来保存这些数据。
 * 7. onWorkerStart之后的代码每个worker都需要在内存中保存一份
 */
$ws->on(&#39;workerstart&#39;, function ($ws, $wid) {
	$jobType = $ws->taskworker ? &#39;Tasker&#39; : &#39;Worker&#39;;
	swoole_set_process_name(PROCESS_NAME.&#39;_&#39;.$jobType.&#39;_&#39;.$wid);
	$GLOBALS[&#39;ws&#39;] = $ws; //保存server对象到全局中以待使用
	if ($jobType == &#39;Worker&#39;) { //在某个worker进程上绑定redis订阅进程
		if ($wid === 0) {
            $dataRedis = null;
            $dataRedis = new Redis();
            $dataRedis->connect(REDIS_HOST_DATA, REDIS_PORT_DATA);
            $dataRedis->auth(REDIS_PWD_DATA);
            //使用psubscribe订阅指定模式的频道,这里*表示所有频道
            //请注意,redis订阅不提供区分库(db)的功能,所以多个库都同时在发布同一个名字的频道时,都将被订阅到
			$dataRedis->psubscribe(array("*"), "sendTask");
		}
	}
});
 
/**
 * 管理进程启用时,调用该回调函数
 * 注意manager进程中不能添加定时器
 * manager进程中可以调用sendMessage接口向其他工作进程发送消息
 */
$ws->on(&#39;managerstart&#39;, function ($ws) {
	swoole_set_process_name(PROCESS_NAME.&#39;_manage&#39;);
});
 
/**
 * swoole websocket服务特有的回调函数,此函数在websocket服务器中必须定义实现,否则websocket服务将无法启动
 * 当服务器收到来自客户端的数据帧时会回调此函数
 * @param: $ws为swoole_websocket_server对象,其结构在调试时可var_dump查看
 * @param: $frame为swoole_websocket_frame对象,包含了客户端发来的数据帧信息,包含以下四个属性:
 * @param: $frame->fd: 客户端的socket id,每个id对应一个客户端,推送消息的时候需要指定
 * @param: $frame->data: 数据内容,可以是文本内容或者是二进制数据(图片等),可以通过opcode的值来判断。$data 如果是文本类型,编码格式必然是UTF-8,这是WebSocket协议规定的
 * @param: $frame->opcode: WebSocket的OpCode类型,可以参考WebSocket协议标准文档, WEBSOCKET_OPCODE_TEXT = 0x1 ,文本数据; WEBSOCKET_OPCODE_BINARY = 0x2 ,二进制数据
 * @param: $frame->finish: 表示数据帧是否完整,一个WebSocket请求可能会分成多个数据帧进行发送
 * 注意点: 客户端发送的ping帧不会触发onMessage,底层会自动回复pong包
 */
$ws->on(&#39;message&#39;, function ($ws, $frame) {
    echo "Server has receive message\n";
    //接收到客户端请求,并建立连接之后,进行相应业务的处理
    handleClientData($ws, $frame);
});
 
/**
 * 在task_worker进程内被调用。worker进程可以使用swoole_server_task函数向task_worker进程投递新的任务(此处使用的是taskwait)
 * 当前的Task进程在调用onTask回调函数时会将进程状态切换为忙碌,这时将不再接收新的Task,当onTask函数返回时会将进程状态切换为空闲然后继续接收新的Task。
 * @param: $ws swoole_websocket_server object
 * @param: $tid task process id
 * @param: $wid from id 表示来自哪个Worker进程。$task_id和$wid组合起来才是全局唯一的,不同的worker进程投递的任务ID可能会有相同
 * @param: $data 需要执行的任务内容
 * 注意点: onTask函数执行时遇到致命错误退出,或者被外部进程强制kill,当前的任务会被丢弃,但不会影响其他正在排队的Task
 */
$ws->on(&#39;task&#39;, function ($ws, $tid, $wid, $data) {
	switch ($data[&#39;cmd&#39;]) {
		case &#39;pushToClient&#39;: $ret = pushToClientTask($ws, $data[&#39;key&#39;], $data[&#39;val&#39;]); break;
	}
	//1.7.2以上的版本,在onTask函数中 return字符串,表示将此内容返回给worker进程。worker进程中会触发onFinish函数,表示投递的task已完成。return的变量可以是任意非null的PHP变量
	return $returnContent;
	//1.7.2以前的版本,需要调用swoole_server->finish()函数将结果返回给worker进程
	// $ws->finish($data);
});
 
/**
 * 当worker进程投递的任务在task_worker中完成时,task进程会通过$ws->finish()方法将任务处理的结果发送给worker进程。
 * @param: $ws swoole_websocket_server object
 * @param: $tid task_id
 * @param: $data 任务处理后的结果内容
 * 注意点: task进程的onTask事件中没有调用finish方法或者return结果,worker进程不会触发onFinish
 *        执行onFinish逻辑的worker进程与下发task任务的worker进程是同一个进程
 */
$ws->on(&#39;finish&#39;, function($ws, $tid, $data) {
 
});
 
/**
 * TCP客户端连接关闭后,在worker进程中回调此函数
 * 在函数中可以做一些类似于删除业务中与每个客户端交互时存放的数据的操作
 * @param: $ws swoole_websocket_server object
 * @param: $fd 已关闭的fd interger
 * @param: $rid(可选),来自哪个reactor线程
 * 注意点: 
 * 1. onClose回调函数如果发生了致命错误,会导致连接泄漏。通过netstat命令会看到大量CLOSE_WAIT状态的TCP连接
 * 2. 查看命令netstat -anopc | grep 端口号,可以查看到TCP接收和发送队列是否有堆积以及TCP连接的状态
 * 3. 无论由客户端发起close还是服务器端主动调用$serv->close()关闭连接,都会触发此事件。因此只要连接关闭,就一定会回调此函数
 * 4. 1.7.7+版本以后onClose中依然可以调用connection_info方法获取到连接信息,在onClose回调函数执行完毕后才会调用close关闭TCP连接
 * 5. 这里回调onClose时表示客户端连接已经关闭,所以无需执行$server->close($fd)。代码中执行$serv->close($fd)会抛出PHP错误告警。也就是在onclose中不能再$ws->close()了.
 * 6. swoole-1.9.7版本修改了$reactorId参数,当服务器主动关闭连接时,底层会设置此参数为-1,可以通过判断$reactorId < 0来分辨关闭是由服务器端还是客户端发起的(debug时可以使用)
 */
$ws->on(&#39;close&#39;, function ($ws, $fd) {
	$redis = new Redis();
	$redis->connect(REDIS_HOST, REDIS_PORT);
	$redis->auth(REDIS_PWD);
	$sArr = $redis->sMembers(REDIS_FD_S.$fd);
	if (!empty($sArr)) {
		foreach ((array)$sArr as $key => $sc) {
			$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
			$num = $redis->sCard(REDIS_S_FD.$sc);
			if ($num == &#39;0&#39;) {
				$redis->sRem(REDIS_S_KEY, $sc);
				$redis->hDel(REDIS_ZS_KEY, $sc);
			}
		}
	}
	$redis->del(REDIS_FD_S.$fd);
	$redis->close();
	echo "FD $fd has closed.\n";
});
 
/**
 * 开启swoole_websocket_server服务
 */
$ws->start();
 
 
/**
 * 接受到消息以后进行响应异步任务的执行
 * @param: $ws swoole_websocket_sever object
 * @param: $frame swoole_websocket_frame obejct
 */
function handleClientData($ws, $frame) {
	$data = $frame->data;
	$redis = new Redis();
	$redis->connect(REDIS_HOST, REDIS_PORT);
	$redis->auth(REDIS_PWD);
	$isMembers = $redis->sIsmember(REDIS_S_FD.$sc, $frame->fd);
	if (!$isMembers) {
		$res = $redis->sAdd(REDIS_S_FD.$sc, $frame->fd);
	}
	$redis->sAdd(REDIS_FD_S.$frame->fd, $sc);
	$isMembers = $redis->sIsmember(REDIS_S_KEY, $sc);
	if (!$isMembers) { 
		$redis->sAdd(REDIS_S_KEY, $sc);
	}
}
 
 
/**
 * redis订阅后的回调函数
 * @param: $ins instance实例
 * @param: $pattern 匹配模式
 * @param: $channel 频道名
 * @param: $data 数据
 * 注意点: subscribe和psubscribe两种不同的订阅方式的回调函数的参数个数不一样,后者多了$pattern参数
 */
function sendTask($ins, $pattern, $channel, $data) {
	//满足一些条件后,投递到task进程中进行推送
	$taskData = array(
		&#39;cmd&#39; => &#39;pushToClient&#39;,
		&#39;key&#39; => $sc,
		&#39;val&#39; => $data,
	);
	//请注意,taskwait是同步阻塞的,所以改脚本并不是全异步非阻塞的
	$GLOBALS[&#39;ws&#39;]->taskwait($taskData);
}
 
/**
 * 推送消息到指定的客户端
 * @param: $ws swoole_websocket_server object
 * @param: $sc 股票代码
 * @param: $data 要推送的数据
 */
function pushToClientTask($ws, $sc, $data) {
    $redis = new Redis();
    $redis->connect(REDIS_HOST, REDIS_PORT);
    $redis->auth(REDIS_PWD);
	$fdList = $redis->sMembers(REDIS_S_FD.$sArr[4]);
	if (!empty($fdList)) {
		foreach ((array)$fdList as $fd) {
			$res = $GLOBALS[&#39;ws&#39;]->push($fd, $data);
			echo "FD: $fd push $res.\n";
			if (!$res) { //推送失败,即客户端已经断开连接
				//从该fd订阅的所有股票中删除该fd
				$sArrOfFd = $redis->sMembers(REDIS_FD_S.$fd);
				if (!empty($sArrOfFd)) {
					foreach ((array)$sArrOfFd as $key => $sc) {
						$res = $redis->sRem(REDIS_S_FD.$sc, $fd);
						$num = $redis->sCard(REDIS_S_FD.$sc);
						if ($num == &#39;0&#39;) {
							$redis->sRem(REDIS_S_KEY, $sc);
							$redis->hDel(REDIS_ZS_KEY, $sc);
						}
					}
				}
				$redis->del(REDIS_FD_S.$fd);
			}
		}
	}
    $redis->close();
}

以上是swoole如何实现实时推送的详细内容。更多信息请关注PHP中文网其他相关文章!

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
我该如何为Swoole开源项目做出贡献?我该如何为Swoole开源项目做出贡献?Mar 18, 2025 pm 03:58 PM

本文概述了为Swoole项目做出贡献的方法,包括报告错误,提交功能,编码和改进文档。它讨论了初学者开始贡献的必要技能和步骤,以及如何找到紧迫的是

如何使用自定义模块扩展Swoole?如何使用自定义模块扩展Swoole?Mar 18, 2025 pm 03:57 PM

文章讨论了使用自定义模块,详细的步骤,最佳实践和故障排除扩展swoole。主要重点是增强功能和集成。

如何使用Swoole的异步I/O功能?如何使用Swoole的异步I/O功能?Mar 18, 2025 pm 03:56 PM

本文讨论了在PHP中使用Swoole的异步I/O功能用于高性能应用程序。它涵盖安装,服务器设置和优化策略。单词计数:159

如何配置Swoole的过程隔离?如何配置Swoole的过程隔离?Mar 18, 2025 pm 03:55 PM

文章讨论了配置Swoole的流程隔离,其好处如提高稳定性和安全性以及故障排除方法。

Swoole的反应堆模型如何在引擎盖下工作?Swoole的反应堆模型如何在引擎盖下工作?Mar 18, 2025 pm 03:54 PM

Swoole的反应堆模型使用事件驱动的,非阻滞I/O架构来有效地管理高持续性场景,通过各种技术优化性能。(159个字符)(159个字符)

如何在Swoole中解决连接问题?如何在Swoole中解决连接问题?Mar 18, 2025 pm 03:53 PM

文章讨论了对PHP框架Swoole中的连接问题的故障排除,原因,监视和预防。

我可以使用什么工具来监视Swoole的性能?我可以使用什么工具来监视Swoole的性能?Mar 18, 2025 pm 03:52 PM

本文讨论了监视和优化Swoole的性能的工具和最佳实践,以及针对性能问题的故障排除方法。

如何解决Swoole应用程序中的内存泄漏?如何解决Swoole应用程序中的内存泄漏?Mar 18, 2025 pm 03:51 PM

摘要:本文讨论了通过识别,隔离和固定解决SWOORE应用程序中的内存泄漏,并强调了常见原因,例如不当资源管理和不受管理的Coroutines。 Swoole Tracker和Valgrind等工具

See all articles

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

SublimeText3 Linux新版

SublimeText3 Linux新版

SublimeText3 Linux最新版

mPDF

mPDF

mPDF是一个PHP库,可以从UTF-8编码的HTML生成PDF文件。原作者Ian Back编写mPDF以从他的网站上“即时”输出PDF文件,并处理不同的语言。与原始脚本如HTML2FPDF相比,它的速度较慢,并且在使用Unicode字体时生成的文件较大,但支持CSS样式等,并进行了大量增强。支持几乎所有语言,包括RTL(阿拉伯语和希伯来语)和CJK(中日韩)。支持嵌套的块级元素(如P、DIV),

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中

VSCode Windows 64位 下载

VSCode Windows 64位 下载

微软推出的免费、功能强大的一款IDE编辑器