ホームページ >バックエンド開発 >PHPチュートリアル >PHPにおけるRedisのアプリケーションメッセージングの詳細説明
メッセージングはさまざまなWebサイトで広く使用されており、この機能もWebサイトには不可欠です。この記事では主に、PHP メッセージ パッシングにおける Redis の応用について紹介します。
内容の読み取り
1. 概要
2. 1対1のメッセージング
4. 多対多のメッセージング
1. 概要メッセージングのアプリケーションはさまざまなWebサイトで広く使用されており、この機能もWebサイトには不可欠です。一般的なメッセージング アプリケーションには、Sina Weibo の @me、コメント後のプロンプト、いいねプロンプト、プライベート メッセージ、さらには Weibo で共有される新しいもの、送信された Zhihu メッセージ、Zhihu チーム メッセージなどが含まれます。
2.実装方法
メッセージパッシングとは、2つ以上のクライアントが相互にメッセージを送受信することを意味します。これを実現するには通常 2 つの方法があります:
1 つ目はメッセージ プッシュです。 Redis にはこの組み込みメカニズムがあり、Publish はメッセージをチャネルにプッシュし、チャネルをサブスクライブします。この方法の欠点の 1 つは、受信者が常にオンラインでなければならないことです (つまり、プログラムは現時点では停止できず、監視状態のままです。接続が切断されると、クライアントは情報を失います)
2 つ目はメッセージプル Pick です。 いわゆるメッセージプルとは、クライアントがサーバーに保存されているデータを独自に取得することを意味します。 Redis は内部的にメッセージ取得メカニズムを実装していません。したがって、この機能を実装するには、手動でコードを自分で記述する必要があります。
ここでは、メッセージングをさらに 1 対 1 メッセージングと多対多メッセージング (グループ メッセージング) に細分します。[注: 2 つのクラスのコードは比較的大きいため、折りたたまれています]3. 1 対 1 のメッセージング
例 1: 1 対 1 のメッセージの送信と取得モジュールの要件:
1. 新しいメッセージを送信した連絡先の数を確認する 2. 情報には、メッセージの送信者、時刻、内容が含まれます
3. 以前の古いメッセージを取得する機能 4.メッセージは 7 日間保持でき、期限切れになると受動的に削除がトリガーされます Redis 実装のアイデア: 1. 新しいメッセージと古いメッセージはそれぞれ 2 つのリンクされたリストに保存されます2。配列の形式で保存され、送信者、タイムスタンプ、情報コンテンツが含まれます3. Redis リンクリストにプッシュする前に、データを json 型に変換してから保存する必要があります
4. , rpoplpush を使用して実装する必要があり、読み取られた新しい情報は、古いメッセージのリンク リストにプッシュされます 5. 古いメッセージを取り出すときは、古いメッセージの時刻を比較する必要があります。タイムアウトになった場合は、それ以降のデータをすべて直接削除します (データは時間に応じて 1 つずつリンクされたリストにプッシュされるため、時間順に配置されます) データストレージ構造図:
PHP 実装コード:
#SinglePullMessage.class.php<?php
#单接接收者接收消息
class SinglePullMessage
{
private $redis=''; #存储redis对象
/**
* @desc 构造函数
*
* @param $host string | redis主机
* @param $port int | 端口
*/
public function __construct($host,$port=6379)
{
$this->redis=new Redis();
$this->redis->connect($host,$port);
}
/**
* @desc 发送消息(一个人)
*
* @param $toUser string | 接收人
* @param $messageArr array | 发送的消息数组,包含sender、message、time
*
* @return bool
*/
public function sendSingle($toUser,$messageArr)
{
$json_message=json_encode($messageArr); #编码成json数据
return $this->redis->lpush($toUser,$json_message); #将数据推入链表
}
/**
* @desc 用户获取新消息
*
* @param $user string | 用户名
*
* @return array 返回数组,包含多少个用户发来新消息,以及具体消息
*/
public function getNewMessage($user)
{
#接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除
$messageArr=array();
while($json_message=$this->redis->rpoplpush($user, 'preMessage_'.$user))
{
$temp=json_decode($json_message); #将json数据变成对象
$messageArr[$temp->sender][]=$temp; #转换成数组信息
}
if($messageArr)
{
$arr['count']=count($messageArr); #统计有多少个用户发来信息
$arr['messageArr']=$messageArr;
return $arr;
}
return false;
}
public function getPreMessage($user)
{
##取出旧消息
$messageArr=array();
$json_pre=$this->redis->lrange('preMessage_'.$user, 0, -1); #一次性将全部旧消息取出来
foreach ($json_pre as $k => $v)
{
$temp=json_decode($v); #json反编码
$timeout=$temp->time+60*60*24*7; #数据过期时间 七天过期
if($timeout<time()) #判断数据是否过期
{
if($k==0) #若是最迟插入的数据都过期了,则将所有数据删除
{
$this->redis->del('preMessage_'.$user);
break;
}
$this->redis->ltrim('preMessage_'.$user, 0, $k); #若检测出有过期的,则将比它之前插入的所有数据删除
break;
}
$messageArr[$temp->sender][]=$temp;
}
return $messageArr;
}
/**
* @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。
*
* @param $arr array | 需要处理的信息数组
*
* @return 返回打印输出
*/
public function dealArr($arr)
{
foreach ($arr as $k => $v)
{
foreach ($v as $k1 => $v2)
{
echo '发送人:'.$v2->sender.' 发送时间:'.date('Y-m-d h:i:s',$v2->time).'<br/>';
echo '消息内容:'.$v2->message.'<br/>';
}
echo "<hr/>";
}
}
}
#build test1.php
include './SinglePullMessage.class.php';
$object=new SinglePullMessage('192.168.95.11');
#发送消息
$sender='boss'; #发送者
$to='jane'; #接收者
$message='How are you'; #信息
$time=time();
$arr=array('sender'=>$sender,'message'=>$message,'time'=>$time);
echo $object->sendSingle($to,$arr);
include './SinglePullMessage.class.php';
$object=new SinglePullMessage('192.168.95.11');
#获取新消息
$arr=$object->getNewMessage('jane');
if($arr)
{
echo $arr['count']."个联系人发来新消息<br/><hr/>";
$object->dealArr($arr['messageArr']);
}
else
echo "无新消息";
3.
#テスト3をビルドします。 phpinclude './SinglePullMessage.class.php';
$object=new SinglePullMessage('192.168.95.11');
#获取旧消息
$arr=$object->getPreMessage('jane');
if($arr)
{
$object->dealArr($arr);
}
else
echo "无旧数据";
例 2: 多対多のメッセージの送信と取得 (つまり、グループ)
モジュール要件:1 . ユーザーは自分でグループを作成し、グループのリーダーになることができます2. グループのリーダーは、他のユーザーをグループのメンバーとして参加させたり、グループをキックしたりすることができます
3. ユーザーは、メッセージを送信することができます4.メッセージのプル5. グループのメッセージの最大容量は 5,000 です6. メンバーは新しいメッセージをプルでき、新しいメッセージの数が表示されます7. メンバーはページ内で以前に読まれたメッセージを取得できます。 。 。 。 。これらの機能を書き留めておきます。必要な場合や練習したい場合は、ミュート、匿名メッセージ送信、ファイル送信などの他の機能を追加できます。 Redis 実装のアイデア:
1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的member存储用户发送的json数据消息,score存储唯一值,将采用原子操作incr获取string中的自增长值进行存储;群组成员有序集合的member存储user,score存储非零数字(在这里这个score意义不大,我的例子代码中使用数字1为群主的score,其他的存储为2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。
2、用户所加入的群组也是采用有序集合进行存储。其中,member存储群组ID,score存储用户已经获取该群组的最大消息分值(对应群组消息的score值)
3、用户创建群组的时候,通过原子操作incr从而获取一个唯一ID
4、用户在群中发送消息时,也是通过原子操作incr获取一个唯一自增长有序ID
5、在执行incr时,为防止并发导致竞争关系,因此需要进行加锁操作【redis详细锁的讲解可以参考:Redis构建分布式锁http://www.jb51.net/article/109704.htm】
6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群ID添加用户所参加的群组有序集合中。
数据存储结构图:
PHP的代码实现:
#ManyPullMessage.class.php
<?php class ManyPullMessage { private $redis=''; #存储redis对象 /** * @desc 构造函数 * * @param $host string | redis主机 * @param $port int | 端口 */ public function __construct($host,$port=6379) { $this->redis=new Redis(); $this->redis->connect($host,$port); } /** * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组 * * @param $user string | 用户名,创建群组的主人 * @param $addUser array | 其他用户构成的数组 * * @param $lockName string | 锁的名字,用于获取群组ID的时候用 * @return int 返回群组ID */ public function createGroupChat($user, $addUser=array(), $lockName='chatIdLock') { $identifier=$this->getLock($lockName); #获取锁 if($identifier) { $id=$this->redis->incr('groupChatID'); #获取群组ID $this->releaseLock($lockName,$identifier); #释放锁 } else return false; $messageCount=$this->redis->set('countMessage_'.$id, 0); #初始化这个群组消息计数器 #开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接 $pipe=$this->redis->pipeline(); $this->redis->zadd('groupChat_'.$id, 1, $user); #创建群组成员有序集合,并添加群主 #将这个群组添加到user所参加的群组有序集合中 $this->redis->zadd('hasGroupChat_'.$user, 0, $id); foreach ($addUser as $v) #创建群组的同时需要添加的用户成员 { $this->redis->zadd('groupChat_'.$id, 2, $v); $this->redis->zadd('hasGroupChat_'.$v, 0, $id); } $pipe->exec(); return $id; #返回群组ID } /** * @desc 群主主动拉人进群 * * @param $user string | 群主名 * @param $groupChatID int | 群组ID * @param $addMembers array | 需要拉进群的用户 * * @return bool */ public function addMembers($user, $groupChatID, $addMembers=array()) { $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #将groupChatName的群主取出来 if($groupMasterScore==1) #判断user是否是群主 { $pipe=$this->redis->pipeline(); #开启非事务流水线 foreach ($addMembers as $v) { $this->redis->zadd('groupChat_'.$groupChatID, 2, $v); #添加进群 $this->redis->zadd('hasGroupChat_'.$v, 0, $groupChatID); #添加群名到用户的有序集合中 } $pipe->exec(); return true; } return false; } /** * @desc 群主删除成员 * * @param $user string | 群主名 * @param $groupChatID int | 群组ID * @param $delMembers array | 需要删除的成员名字 * * @return bool */ public function delMembers($user, $groupChatID, $delMembers=array()) { $groupMasterScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); if($groupMasterScore==1) #判断user是否是群主 { $pipe=$this->redis->pipeline(); #开启非事务流水线 foreach ($delMembers as $v) { $this->redis->zrem('groupChat_'.$groupChatID, $v); $this->redis->zrem('hasGroupChat_'.$v, $groupChatID); } $pipe->exec(); return true; } return false; } /** * @desc 退出群组 * * @param $user string | 用户名 * @param $groupChatID int | 群组名 */ public function quitGroupChat($user, $groupChatID) { $this->redis->zrem('groupChat_'.$groupChatID, $user); $this->redis->zrem('hasGroupChat_'.$user, $groupChatID); return true; } /** * @desc 发送消息 * * @param $user string | 用户名 * @param $groupChatID int | 群组ID * @param $messageArr array | 包含发送消息的数组 * @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID * * @return bool */ public function sendMessage($user, $groupChatID, $messageArr, $preLockName='countLock_') { $memberScore=$this->redis->zscore('groupChat_'.$groupChatID, $user); #成员score if($memberScore) { $identifier=$this->getLock($preLockName.$groupChatID); #获取锁 if($identifier) #判断获取锁是否成功 { $messageCount=$this->redis->incr('countMessage_'.$groupChatID); $this->releaseLock($preLockName.$groupChatID,$identifier); #释放锁 } else return false; $json_message=json_encode($messageArr); $this->redis->zadd('groupChatMessage_'.$groupChatID, $messageCount, $json_message); $count=$this->redis->zcard('groupChatMessage_'.$groupChatID); #查看信息量大小 if($count>5000) #判断数据量有没有达到5000条 { #数据量超5000,则需要清除旧数据 $start=5000-$count; $this->redis->zremrangebyrank('groupChatMessage_'.$groupChatID, $start, $count); } return true; } return false; } /** * @desc 获取新信息 * * @param $user string | 用户名 * * @return 成功则放回json数据数组,无新信息返回false */ public function getNewMessage($user) { $arrID=$this->redis->zrange('hasGroupChat_'.$user, 0, -1, 'withscores'); #获取用户拥有的群组ID $json_message=array(); #初始化 foreach ($arrID as $k => $v) #遍历循环所有群组,查看是否有新消息 { $messageCount=$this->redis->get('countMessage_'.$k); #群组最大信息分值数 if($messageCount>$v) #判断用户是否存在未读新消息 { $json_message[$k]['message']=$this->redis->zrangebyscore('groupChatMessage_'.$k, $v+1, $messageCount); $json_message[$k]['count']=count($json_message[$k]['message']); #统计新消息数量 $this->redis->zadd('hasGroupChat_'.$user, $messageCount, $k); #更新已获取消息 } } if($json_message) return $json_message; return false; } /** * @desc 分页获取群组信息 * * @param $user string | 用户名 * @param $groupChatID int | 群组ID * @param $page int | 第几页 * @param $size int | 每页多少条数据 * * @return 成功返回json数据,失败返回false */ public function getPartMessage($user, $groupChatID, $page=1, $size=10) { $start=$page*$size-$size; #开始截取数据位置 $stop=$page*$size-1; #结束截取数据位置 $json_message=$this->redis->zrevrange('groupChatMessage_'.$groupChatID, $start, $stop); if($json_message) return $json_message; return false; } /** * @desc 加锁方法 * * @param $lockName string | 锁的名字 * @param $timeout int | 锁的过期时间 * * @return 成功返回identifier/失败返回false */ public function getLock($lockName, $timeout=2) { $identifier=uniqid(); #获取唯一标识符 $timeout=ceil($timeout); #确保是整数 $end=time()+$timeout; while(time()<$end) #循环获取锁 { /* #这里的set操作可以等同于下面那个if操作,并且可以减少一次与redis通讯 if($this->redis->set($lockName, $identifier array('nx', 'ex'=>$timeout))) return $identifier; */ if($this->redis->setnx($lockName, $identifier)) #查看$lockName是否被上锁 { $this->redis->expire($lockName, $timeout); #为$lockName设置过期时间 return $identifier; #返回一维标识符 } elseif ($this->redis->ttl($lockName)===-1) { $this->redis->expire($lockName, $timeout); #检测是否有设置过期时间,没有则加上 } usleep(0.001); #停止0.001ms } return false; } /** * @desc 释放锁 * * @param $lockName string | 锁名 * @param $identifier string | 锁的唯一值 * * @param bool */ public function releaseLock($lockName,$identifier) { if($this->redis->get($lockName)==$identifier) #判断是锁有没有被其他客户端修改 { $this->redis->multi(); $this->redis->del($lockName); #释放锁 $this->redis->exec(); return true; } else { return false; #其他客户端修改了锁,不能删除别人的锁 } } } ?>
测试:
1、建立createGroupChat.php(测试创建群组功能)
执行代码并创建568、569群组(群主为jack)
include './ManyPullMessage.class.php'; $object=new ManyPullMessage('192.168.95.11'); #创建群组 $user='jack'; $arr=array('jane1','jane2'); $a=$object->createGroupChat($user,$arr); echo "<pre class="brush:php;toolbar:false">"; print_r($a); echo "";die;
2、建立addMembers.php(测试添加成员功能)
执行代码并添加新成员
include './ManyPullMessage.class.php'; $object=new ManyPullMessage('192.168.95.11'); $b=$object->addMembers('jack','568',array('jane1','jane2','jane3','jane4')); echo "<pre class="brush:php;toolbar:false">"; print_r($b); echo "";die;
3、建立delete.php(测试群主删除成员功能)
include './ManyPullMessage.class.php'; $object=new ManyPullMessage('192.168.95.11'); #群主删除成员 $c=$object->delMembers('jack', '568', array('jane1','jane4')); echo "<pre class="brush:php;toolbar:false">"; print_r($c); echo "";die;
4、建立sendMessage.php(测试发送消息功能)
多执行几遍,568、569都发几条
include './ManyPullMessage.class.php'; $object=new ManyPullMessage('192.168.95.11'); #发送消息 $user='jane2'; $message='go go go'; $groupChatID=568; $arr=array('sender'=>$user, 'message'=>$message, 'time'=>time()); $d=$object->sendMessage($user,$groupChatID,$arr); echo "<pre class="brush:php;toolbar:false">"; print_r($d); echo "";die;
5、建立getNewMessage.php(测试用户获取新消息功能)
include './ManyPullMessage.class.php'; $object=new ManyPullMessage('192.168.95.11'); #用户获取新消息 $e=$object->getNewMessage('jane2'); echo "<pre class="brush:php;toolbar:false">"; print_r($e); echo "";die;
6、建立getPartMessage.php(测试用户获取某个群组部分消息)
(多发送几条消息,用于测试。568中共18条数据)
include './ManyPullMessage.class.php'; $object=new ManyPullMessage('192.168.95.11'); #用户获取某个群组部分消息 $f=$object->getPartMessage('jane2', 568, 1, 10); echo "<pre class="brush:php;toolbar:false">"; print_r($f); echo "";die;
page=1,size=10
page=2,size=10
测试完毕,还需要别的功能可以自己进行修改添加测试。
以上就是本文的全部内容,希望对大家的学习有所帮助。
相关推荐:
PHP を使用して Redis キューでの e コマース注文の受信を自動的に確認する方法
以上がPHPにおけるRedisのアプリケーションメッセージングの詳細説明の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。