基于 Hyperf+ WebSocket +RabbitMQ 实现的一个简单大屏幕的消息推送。
思路
利用 WebSocket 协议让客户端和服务器端保持有状态的长链接,
保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。
WebSocket 服务
composer require hyperf/websocket-server
配置文件 [config/autoload/server.php]
<?php return [ 'mode' => SWOOLE_PROCESS, 'servers' => [ [ 'name' => 'http', 'type' => Server::SERVER_HTTP, 'host' => '0.0.0.0', 'port' => 11111, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_REQUEST => [Hyperf\HttpServer\Server::class, 'onRequest'], ], ], [ 'name' => 'ws', 'type' => Server::SERVER_WEBSOCKET, 'host' => '0.0.0.0', 'port' => 12222, 'sock_type' => SWOOLE_SOCK_TCP, 'callbacks' => [ SwooleEvent::ON_HAND_SHAKE => [Hyperf\WebSocketServer\Server::class, 'onHandShake'], SwooleEvent::ON_MESSAGE => [Hyperf\WebSocketServer\Server::class, 'onMessage'], SwooleEvent::ON_CLOSE => [Hyperf\WebSocketServer\Server::class, 'onClose'], ], ], ],
WebSocket 服务器端代码示例
<?php declare(strict_types=1); /** * This file is part of Hyperf. * * @link https://www.hyperf.io * @document https://doc.hyperf.io * @contact group@hyperf.io * @license https://github.com/hyperf-cloud/hyperf/blob/master/LICENSE */ namespace App\Controller; use Hyperf\Contract\OnCloseInterface; use Hyperf\Contract\OnMessageInterface; use Hyperf\Contract\OnOpenInterface; use Swoole\Http\Request; use Swoole\Server; use Swoole\Websocket\Frame; use Swoole\WebSocket\Server as WebSocketServer; class WebSocketController extends Controller implements OnMessageInterface, OnOpenInterface, OnCloseInterface { /** * 发送消息 * @param WebSocketServer $server * @param Frame $frame */ public function onMessage(WebSocketServer $server, Frame $frame): void { //心跳刷新缓存 $redis = $this->container->get(\Redis::class); //获取所有的客户端id $fdList = $redis->sMembers('websocket_sjd_1'); //如果当前客户端在客户端集合中,就刷新 if (in_array($frame->fd, $fdList)) { $redis->sAdd('websocket_sjd_1', $frame->fd); $redis->expire('websocket_sjd_1', 7200); } $server->push($frame->fd, 'Recv: ' . $frame->data); } /** * 客户端失去链接 * @param Server $server * @param int $fd * @param int $reactorId */ public function onClose(Server $server, int $fd, int $reactorId): void { //删掉客户端id $redis = $this->container->get(\Redis::class); //移除集合中指定的value $redis->sRem('websocket_sjd_1', $fd); var_dump('closed'); } /** * 客户端链接 * @param WebSocketServer $server * @param Request $request */ public function onOpen(WebSocketServer $server, Request $request): void { //保存客户端id $redis = $this->container->get(\Redis::class); $res1 = $redis->sAdd('websocket_sjd_1', $request->fd); var_dump($res1); $res = $redis->expire('websocket_sjd_1', 7200); var_dump($res); $server->push($request->fd, 'Opened'); } }
WebSocket 前端代码
function WebSocketTest() { if ("WebSocket" in window) { console.log("您的浏览器支持 WebSocket!"); var num = 0 // 打开一个 web socket var ws = new WebSocket("ws://127.0.0.1:12222"); ws.onopen = function () { // Web Socket 已连接上,使用 send() 方法发送数据 //alert("数据发送中..."); //ws.send("发送数据"); }; window.setInterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开 var ping = {"type": "ping"}; ws.send(JSON.stringify(ping)); }, 5000); ws.onmessage = function (evt) { var d = JSON.parse(evt.data); console.log(d); if (d.code == 300) { $(".address").text(d.address) } if (d.code == 200) { var v = d.data console.log(v); num++ var str = `<div class="item"> <p>${v.recordOutTime}</p> <p>${v.userOutName}</p> <p>${v.userOutNum}</p> <p>${v.doorOutName}</p> </div>` $(".tableHead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclose = function () { // 关闭 websocket alert("连接已关闭..."); }; } else { alert("您的浏览器不支持 WebSocket!"); } }
AMQP 组件
composer require hyperf/amqp
配置文件 [config/autoload/amqp.php]
<?php return [ 'default' => [ 'host' => 'localhost', 'port' => 5672, 'user' => 'guest', 'password' => 'guest', 'vhost' => '/', 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, ], 'params' => [ 'insist' => false, 'login_method' => 'AMQPLAIN', 'login_response' => null, 'locale' => 'en_US', 'connection_timeout' => 3.0, 'read_write_timeout' => 6.0, 'context' => null, 'keepalive' => false, 'heartbeat' => 3, ], ], ];
MQ 消费者代码
<?php declare(strict_types=1); namespace App\Amqp\Consumer; use Hyperf\Amqp\Annotation\Consumer; use Hyperf\Amqp\Message\ConsumerMessage; use Hyperf\Amqp\Result; use Hyperf\Server\Server; use Hyperf\Server\ServerFactory; /** * @Consumer(exchange="hyperf", routingKey="hyperf", queue="hyperf", nums=1) */ class DemoConsumer extends ConsumerMessage { /** * rabbmitMQ消费端代码 * @param $data * @return string */ public function consume($data): string { print_r($data); //获取集合中所有的value $redis = $this->container->get(\Redis::class); $fdList=$redis->sMembers('websocket_sjd_1'); $server=$this->container->get(ServerFactory::class)->getServer()->getServer(); foreach($fdList as $key=>$v){ if(!empty($v)){ $server->push((int)$v, $data); } } return Result::ACK; } }
控制器代码
/** * test * @return array */ public function test() { $data = array( 'code' => 200, 'data' => [ 'userOutName' => 'ccflow', 'userOutNum' => '9999', 'recordOutTime' => date("Y-m-d H:i:s", time()), 'doorOutName' => '教师公寓', ] ); $data = \GuzzleHttp\json_encode($data); $message = new DemoProducer($data); $producer = ApplicationContext::getContainer()->get(Producer::class); $result = $producer->produce($message); var_dump($result); $user = $this->request->input('user', 'Hyperf'); $method = $this->request->getMethod(); return [ 'method' => $method, 'message' => "{$user}.", ]; }
最终效果
推荐教程:《PHP教程》
以上是基于 Hyperf + RabbitMQ + WebSocket 实现消息推送的详细内容。更多信息请关注PHP中文网其他相关文章!

PHP和Python各有优势,选择应基于项目需求。1.PHP适合web开发,语法简单,执行效率高。2.Python适用于数据科学和机器学习,语法简洁,库丰富。

PHP不是在消亡,而是在不断适应和进化。1)PHP从1994年起经历多次版本迭代,适应新技术趋势。2)目前广泛应用于电子商务、内容管理系统等领域。3)PHP8引入JIT编译器等功能,提升性能和现代化。4)使用OPcache和遵循PSR-12标准可优化性能和代码质量。

PHP的未来将通过适应新技术趋势和引入创新特性来实现:1)适应云计算、容器化和微服务架构,支持Docker和Kubernetes;2)引入JIT编译器和枚举类型,提升性能和数据处理效率;3)持续优化性能和推广最佳实践。

在PHP中,trait适用于需要方法复用但不适合使用继承的情况。1)trait允许在类中复用方法,避免多重继承复杂性。2)使用trait时需注意方法冲突,可通过insteadof和as关键字解决。3)应避免过度使用trait,保持其单一职责,以优化性能和提高代码可维护性。

依赖注入容器(DIC)是一种管理和提供对象依赖关系的工具,用于PHP项目中。DIC的主要好处包括:1.解耦,使组件独立,代码易维护和测试;2.灵活性,易替换或修改依赖关系;3.可测试性,方便注入mock对象进行单元测试。

SplFixedArray在PHP中是一种固定大小的数组,适用于需要高性能和低内存使用量的场景。1)它在创建时需指定大小,避免动态调整带来的开销。2)基于C语言数组,直接操作内存,访问速度快。3)适合大规模数据处理和内存敏感环境,但需谨慎使用,因其大小固定。

PHP通过$\_FILES变量处理文件上传,确保安全性的方法包括:1.检查上传错误,2.验证文件类型和大小,3.防止文件覆盖,4.移动文件到永久存储位置。

JavaScript中处理空值可以使用NullCoalescingOperator(??)和NullCoalescingAssignmentOperator(??=)。1.??返回第一个非null或非undefined的操作数。2.??=将变量赋值为右操作数的值,但前提是该变量为null或undefined。这些操作符简化了代码逻辑,提高了可读性和性能。


热AI工具

Undresser.AI Undress
人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover
用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool
免费脱衣服图片

Clothoff.io
AI脱衣机

AI Hentai Generator
免费生成ai无尽的。

热门文章

热工具

ZendStudio 13.5.1 Mac
功能强大的PHP集成开发环境

Atom编辑器mac版下载
最流行的的开源编辑器

安全考试浏览器
Safe Exam Browser是一个安全的浏览器环境,用于安全地进行在线考试。该软件将任何计算机变成一个安全的工作站。它控制对任何实用工具的访问,并防止学生使用未经授权的资源。

SublimeText3 Linux新版
SublimeText3 Linux最新版

SublimeText3汉化版
中文版,非常好用