>  기사  >  백엔드 개발  >  PHP Redis의 애플리케이션 메시징에 대한 자세한 설명

PHP Redis의 애플리케이션 메시징에 대한 자세한 설명

墨辰丷
墨辰丷원래의
2018-05-24 15:25:521404검색

메시징은 다양한 웹사이트에서 널리 사용되며, 이 기능은 웹사이트에도 필수적입니다. 이 기사에서는 주로 PHP 메시지 전달에 Redis를 적용하는 방법을 소개합니다.

콘텐츠 읽기

1. 요약

2. 구현 방법

3. 4. 더 많은 메시지

1. 요약메시징의 응용은 다양한 웹사이트에서 널리 사용되고 있으며, 이 기능은 웹사이트에도 필수적입니다. 일반적인 메시징 애플리케이션에는 Sina Weibo의 @me, 프롬프트, 비공개 메시지, Weibo에서 공유된 비공개 메시지 및 전송된 Zhihu 메시지, Zhihu 팀 메시지 등의 새로운 메시지가 포함됩니다.

2. 구현 방법

메시지 전달은 둘 이상의 클라이언트가 서로 메시지를 보내고 받는 것을 의미합니다.

이를 달성하는 방법에는 일반적으로 두 가지가 있습니다.

첫 번째는 메시지 푸시입니다. Redis에는 채널에 메시지를 푸시하고 채널을 구독하기 위해 게시하는 메커니즘이 내장되어 있습니다. 이 방법의 한 가지 단점은 수신자가 항상 온라인 상태인지 확인해야 한다는 것입니다(즉, 현재 프로그램을 중지할 수 없으며 모니터링 상태를 유지합니다. 클라이언트 연결이 끊어지면 클라이언트는 정보를 잃게 됩니다)

두 번째는 Pick Pull 메시지입니다. 소위 메시지 풀링은 클라이언트가 서버에 저장된 데이터를 독립적으로 얻는 것을 의미합니다. Redis는 내부적으로 메시지 풀링 메커니즘을 구현하지 않습니다. 따라서 이 기능을 구현하려면 수동으로 코드를 직접 작성해야 합니다.

여기에서는 메시징을 일대일 메시징, 다대다 메시징(그룹 메시징)으로 더 세분화합니다. [참고: 두 클래스의 코드가 상대적으로 크기 때문에 접혀 있습니다.]

3. 일대일 메시징

예 1: 일대일 메시지 전송 및 획득

모듈 요구 사항: 1. 새 메시지를 보낸 사람 수 확인 2 정보에는 메시지의 보낸 사람, 시간 및 내용이 포함됩니다.

3. 이전 메시지를 가져오는 기능

4. 메시지는 7일 동안 보관할 수 있으며, 만료되면 수동적으로 삭제됩니다

Redis 구현 아이디어:

1 새 메시지와 이전 메시지는 각각 두 개의 연결 목록에 저장됩니다2. 배열 형식으로 저장되며 보내는 사람, 타임스탬프, 정보 콘텐츠가 포함됩니다 3. Redis 연결 목록에 푸시하기 전에 데이터를 json 유형으로 변환한 다음 저장해야 합니다

4. , rpoplpush를 사용하여 이를 구현해야 합니다. 메시지는 이전 메시지 연결 목록에 푸시됩니다.

5. 이전 메시지를 꺼낼 때 시간이 초과되면 이전 메시지의 시간을 비교해야 합니다. 이후의 모든 데이터는 직접적으로(시간에 따라 하나씩 Linked List에 푸시되기 때문에) 시간순으로 정렬됩니다)

데이터 저장 구조도:

PHP 구현 코드:

#SinglePullMessage.class.php

<?php
#单接接收者接收消息
class SinglePullMessage
{
 private $redis=&#39;&#39;; #存储redis对象
 /**
 * @desc 构造函数
 * 
 * @param $host string | redis主机
 * @param $port int | 端口
 */
 public function __construct($host,$port=6379)
 {
 $this->redis=new Redis();
 $this->redis->connect($host,$port);
 } 
 /**
 * @desc 发送消息(一个人)
 * 
 * @param $toUser string | 接收人
 * @param $messageArr array | 发送的消息数组,包含sender、message、time 
 *
 * @return bool
 */
 public function sendSingle($toUser,$messageArr)
 {
 $json_message=json_encode($messageArr); #编码成json数据
 return $this->redis->lpush($toUser,$json_message); #将数据推入链表 
 }
 /**
 * @desc 用户获取新消息
 *
 * @param $user string | 用户名
 *
 * @return array 返回数组,包含多少个用户发来新消息,以及具体消息
 */
 public function getNewMessage($user)
 {
 #接收新信息数据,并且将数据推入旧信息数据链表中,并且在原链表中删除
 $messageArr=array();
 while($json_message=$this->redis->rpoplpush($user, &#39;preMessage_&#39;.$user))
 {
  $temp=json_decode($json_message); #将json数据变成对象
  $messageArr[$temp->sender][]=$temp; #转换成数组信息
 }
 if($messageArr)
 {
  $arr[&#39;count&#39;]=count($messageArr); #统计有多少个用户发来信息
  $arr[&#39;messageArr&#39;]=$messageArr;
  return $arr;
 }
 return false;
 }
 public function getPreMessage($user)
 {
 ##取出旧消息
 $messageArr=array();
 $json_pre=$this->redis->lrange(&#39;preMessage_&#39;.$user, 0, -1); #一次性将全部旧消息取出来
 foreach ($json_pre as $k => $v) 
 {
  $temp=json_decode($v);  #json反编码
  $timeout=$temp->time+60*60*24*7; #数据过期时间 七天过期
  if($timeout<time())  #判断数据是否过期
  {
  if($k==0)   #若是最迟插入的数据都过期了,则将所有数据删除
  {
   $this->redis->del(&#39;preMessage_&#39;.$user);
   break;
  }
  $this->redis->ltrim(&#39;preMessage_&#39;.$user, 0, $k); #若检测出有过期的,则将比它之前插入的所有数据删除
  break;
  }
  $messageArr[$temp->sender][]=$temp;
 }
 return $messageArr;
 }
 /**
 * @desc 消息处理,没什么特别的作用。在这里这是用来处理数组信息,然后将其输出。 
 *
 * @param $arr array | 需要处理的信息数组
 *
 * @return 返回打印输出
 */
 public function dealArr($arr)
 {
 foreach ($arr as $k => $v) 
 {
  foreach ($v as $k1 => $v2) 
  {
  echo &#39;发送人:&#39;.$v2->sender.&#39; 发送时间:&#39;.date(&#39;Y-m-d h:i:s&#39;,$v2->time).&#39;<br/>&#39;;
  echo &#39;消息内容:&#39;.$v2->message.&#39;<br/>&#39;;
  }
  echo "<hr/>";
 }
 }
}

테스트:

1 메시지 보내기

#build test1.php

include &#39;./SinglePullMessage.class.php&#39;;
$object=new SinglePullMessage(&#39;192.168.95.11&#39;);
#发送消息
$sender=&#39;boss&#39;; #发送者
$to=&#39;jane&#39;;  #接收者
$message=&#39;How are you&#39;; #信息
$time=time();
$arr=array(&#39;sender&#39;=>$sender,&#39;message&#39;=>$message,&#39;time&#39;=>$time);
echo $object->sendSingle($to,$arr);

2 새로 구매하세요. 메시지

#build test2.php

include &#39;./SinglePullMessage.class.php&#39;;
$object=new SinglePullMessage(&#39;192.168.95.11&#39;);
#获取新消息
$arr=$object->getNewMessage(&#39;jane&#39;);
if($arr)
{
 echo $arr[&#39;count&#39;]."个联系人发来新消息<br/><hr/>";
 $object->dealArr($arr[&#39;messageArr&#39;]); 
}
else
 echo "无新消息";

방문 결과:

3. 오래된 메시지 가져오기

#Build test3.php

include &#39;./SinglePullMessage.class.php&#39;;
$object=new SinglePullMessage(&#39;192.168.95.11&#39;);
#获取旧消息
$arr=$object->getPreMessage(&#39;jane&#39;);
if($arr)
{
 $object->dealArr($arr);
}
else
 echo "无旧数据";

4. 다대다 메시징

예 2: 다대다 메시지 보내기 및 받기(예: 그룹)

모듈 요구 사항:1. 사용자는 스스로 그룹을 만들고 그룹이 될 수 있습니다. 리더2. 그룹 리더는 사람들을 그룹 구성원으로 데려오고 사람들을 추방할 수 있습니다.

3. 사용자는 그룹에서 직접 탈퇴할 수 있습니다.

4. 각 구성원은 메시지를 가져올 수 있습니다

5. 그룹의 메시지는 5,000

6개입니다. 회원은 새 메시지를 가져올 수 있으며 새 메시지가 몇 개인지 확인할 수 있습니다.

7. 회원은 페이지에서 이전에 읽은 메시지를 받을 수 있습니다

. . . . . 연습이 필요하거나 연습하고 싶은 학생들은 음소거, 익명 메시지 보내기, 파일 보내기 등과 같은 다른 기능을 추가할 수 있습니다.

Redis 구현 아이디어:

1、群组的消息以及群组的成员组成采用有序集合进行存储。群组消息有序集合的member存储用户发送的json数据消息,score存储唯一值,将采用原子操作incr获取string中的自增长值进行存储;群组成员有序集合的member存储user,score存储非零数字(在这里这个score意义不大,我的例子代码中使用数字1为群主的score,其他的存储为2。当然这使用这个数据还可以扩展别的功能,例如群组中成员等级)可参考下面数据存储结构简图。

2、用户所加入的群组也是采用有序集合进行存储。其中,member存储群组ID,score存储用户已经获取该群组的最大消息分值(对应群组消息的score值)

3、用户创建群组的时候,通过原子操作incr从而获取一个唯一ID

4、用户在群中发送消息时,也是通过原子操作incr获取一个唯一自增长有序ID

5、在执行incr时,为防止并发导致竞争关系,因此需要进行加锁操作【redis详细锁的讲解可以参考:Redis构建分布式锁http://www.jb51.net/article/109704.htm】

6、创建群组方法简要思路,任何一个用户都可以创建群组聊天,在创建的同时,可以选择时是否添加群组成员(参数通过数组的形式)。创建过程将会为这个群组建立一个群组成员有序集合(群组信息有序集合暂时不创建),接着将群主添加进去,再将群ID添加用户所参加的群组有序集合中。

数据存储结构图:

PHP的代码实现:

#ManyPullMessage.class.php

<?php
class ManyPullMessage
{
 private $redis=&#39;&#39;; #存储redis对象
 /**
 * @desc 构造函数
 * 
 * @param $host string | redis主机
 * @param $port int | 端口
 */
 public function __construct($host,$port=6379)
 {
 $this->redis=new Redis();
 $this->redis->connect($host,$port);
 } 
 /**
 * @desc 用于创建群组的方法,在创建的同时还可以拉人进群组
 * 
 * @param $user string | 用户名,创建群组的主人
 * @param $addUser array | 其他用户构成的数组
 *
 * @param $lockName string | 锁的名字,用于获取群组ID的时候用
 * @return int 返回群组ID
 */
 public function createGroupChat($user, $addUser=array(), $lockName=&#39;chatIdLock&#39;)
 {
 $identifier=$this->getLock($lockName); #获取锁
 if($identifier)
 {
  $id=$this->redis->incr(&#39;groupChatID&#39;); #获取群组ID
  $this->releaseLock($lockName,$identifier); #释放锁
 }
 else
  return false;
 $messageCount=$this->redis->set(&#39;countMessage_&#39;.$id, 0); #初始化这个群组消息计数器
 #开启非事务型流水线,一次性将所有redis命令传给redis,减少与redis的连接
 $pipe=$this->redis->pipeline(); 
 $this->redis->zadd(&#39;groupChat_&#39;.$id, 1, $user); #创建群组成员有序集合,并添加群主
 #将这个群组添加到user所参加的群组有序集合中
 $this->redis->zadd(&#39;hasGroupChat_&#39;.$user, 0, $id); 
 foreach ($addUser as $v) #创建群组的同时需要添加的用户成员
 {
  $this->redis->zadd(&#39;groupChat_&#39;.$id, 2, $v);
  $this->redis->zadd(&#39;hasGroupChat_&#39;.$v, 0, $id);
 }
 $pipe->exec();
 return $id; #返回群组ID
 }
 /**
 * @desc 群主主动拉人进群
 *
 * @param $user string | 群主名
 * @param $groupChatID int | 群组ID
 * @param $addMembers array | 需要拉进群的用户
 *
 * @return bool
 */
 public function addMembers($user, $groupChatID, $addMembers=array())
 {
 $groupMasterScore=$this->redis->zscore(&#39;groupChat_&#39;.$groupChatID, $user); #将groupChatName的群主取出来
 if($groupMasterScore==1) #判断user是否是群主
 {
  $pipe=$this->redis->pipeline(); #开启非事务流水线
  foreach ($addMembers as $v) 
  {
  $this->redis->zadd(&#39;groupChat_&#39;.$groupChatID, 2, $v);   #添加进群
  $this->redis->zadd(&#39;hasGroupChat_&#39;.$v, 0, $groupChatID); #添加群名到用户的有序集合中
  }
  $pipe->exec();
  return true;
 }
 return false;
 }
 /**
 * @desc 群主删除成员
 *
 * @param $user string | 群主名
 * @param $groupChatID int | 群组ID
 * @param $delMembers array | 需要删除的成员名字
 *
 * @return bool
 */
 public function delMembers($user, $groupChatID, $delMembers=array())
 {
 $groupMasterScore=$this->redis->zscore(&#39;groupChat_&#39;.$groupChatID, $user); 
 if($groupMasterScore==1) #判断user是否是群主
 {
  $pipe=$this->redis->pipeline(); #开启非事务流水线
  foreach ($delMembers as $v) 
  {
  $this->redis->zrem(&#39;groupChat_&#39;.$groupChatID, $v);   
  $this->redis->zrem(&#39;hasGroupChat_&#39;.$v, $groupChatID); 
  }
  $pipe->exec();
  return true;
 }
 return false;
 }
 /**
 * @desc 退出群组
 *
 * @param $user string | 用户名
 * @param $groupChatID int | 群组名
 */
 public function quitGroupChat($user, $groupChatID)
 {
 $this->redis->zrem(&#39;groupChat_&#39;.$groupChatID, $user);
 $this->redis->zrem(&#39;hasGroupChat_&#39;.$user, $groupChatID);
 return true;
 }
 /**
 * @desc 发送消息
 *
 * @param $user string | 用户名
 * @param $groupChatID int | 群组ID
 * @param $messageArr array | 包含发送消息的数组
 * @param $preLockName string | 群消息锁前缀,群消息锁全名为countLock_群ID
 *
 * @return bool
 */
 public function sendMessage($user, $groupChatID, $messageArr, $preLockName=&#39;countLock_&#39;)
 {
 $memberScore=$this->redis->zscore(&#39;groupChat_&#39;.$groupChatID, $user); #成员score
 if($memberScore)
 {
  $identifier=$this->getLock($preLockName.$groupChatID); #获取锁
  if($identifier) #判断获取锁是否成功
  {
  $messageCount=$this->redis->incr(&#39;countMessage_&#39;.$groupChatID);
  $this->releaseLock($preLockName.$groupChatID,$identifier); #释放锁
  }
  else
  return false;
  $json_message=json_encode($messageArr);
  $this->redis->zadd(&#39;groupChatMessage_&#39;.$groupChatID, $messageCount, $json_message);
  $count=$this->redis->zcard(&#39;groupChatMessage_&#39;.$groupChatID); #查看信息量大小
  if($count>5000) #判断数据量有没有达到5000条
  { #数据量超5000,则需要清除旧数据
  $start=5000-$count;
  $this->redis->zremrangebyrank(&#39;groupChatMessage_&#39;.$groupChatID, $start, $count);
  }
  return true;
 }
 return false;
 }
 /**
 * @desc 获取新信息
 *
 * @param $user string | 用户名
 *
 * @return 成功则放回json数据数组,无新信息返回false
 */
 public function getNewMessage($user)
 {
 $arrID=$this->redis->zrange(&#39;hasGroupChat_&#39;.$user, 0, -1, &#39;withscores&#39;); #获取用户拥有的群组ID
 $json_message=array(); #初始化
 foreach ($arrID as $k => $v) #遍历循环所有群组,查看是否有新消息
 {
  $messageCount=$this->redis->get(&#39;countMessage_&#39;.$k); #群组最大信息分值数
  if($messageCount>$v) #判断用户是否存在未读新消息
  {
  $json_message[$k][&#39;message&#39;]=$this->redis->zrangebyscore(&#39;groupChatMessage_&#39;.$k, $v+1, $messageCount);
  $json_message[$k][&#39;count&#39;]=count($json_message[$k][&#39;message&#39;]); #统计新消息数量
  $this->redis->zadd(&#39;hasGroupChat_&#39;.$user, $messageCount, $k); #更新已获取消息
  } 
 }
 if($json_message)
  return $json_message;
 return false;
 }
 /**
 * @desc 分页获取群组信息
 *
 * @param $user string | 用户名 
 * @param $groupChatID int | 群组ID
 * @param $page int | 第几页
 * @param $size int | 每页多少条数据
 *
 * @return 成功返回json数据,失败返回false
 */
 public function getPartMessage($user, $groupChatID, $page=1, $size=10)
 {
 $start=$page*$size-$size; #开始截取数据位置
 $stop=$page*$size-1; #结束截取数据位置
 $json_message=$this->redis->zrevrange(&#39;groupChatMessage_&#39;.$groupChatID, $start, $stop);
 if($json_message)
  return $json_message;
 return false;
 }
 /**
 * @desc 加锁方法
 *
 * @param $lockName string | 锁的名字
 * @param $timeout int | 锁的过期时间
 *
 * @return 成功返回identifier/失败返回false
 */
 public function getLock($lockName, $timeout=2)
 {
 $identifier=uniqid(); #获取唯一标识符
 $timeout=ceil($timeout); #确保是整数
 $end=time()+$timeout;
 while(time()<$end)  #循环获取锁
 {
  /*
  #这里的set操作可以等同于下面那个if操作,并且可以减少一次与redis通讯
  if($this->redis->set($lockName, $identifier array(&#39;nx&#39;, &#39;ex&#39;=>$timeout)))
  return $identifier;
  */
  if($this->redis->setnx($lockName, $identifier)) #查看$lockName是否被上锁
  {
  $this->redis->expire($lockName, $timeout); #为$lockName设置过期时间
  return $identifier;    #返回一维标识符
  }
  elseif ($this->redis->ttl($lockName)===-1) 
  {
  $this->redis->expire($lockName, $timeout); #检测是否有设置过期时间,没有则加上
  }
  usleep(0.001);  #停止0.001ms
 }
 return false;
 }
 /**
 * @desc 释放锁
 *
 * @param $lockName string | 锁名
 * @param $identifier string | 锁的唯一值
 *
 * @param bool
 */
 public function releaseLock($lockName,$identifier)
 {
 if($this->redis->get($lockName)==$identifier) #判断是锁有没有被其他客户端修改
 { 
  $this->redis->multi();
  $this->redis->del($lockName); #释放锁
  $this->redis->exec();
  return true;
 }
 else
 {
  return false; #其他客户端修改了锁,不能删除别人的锁
 }
 }

}
?>

测试:

1、建立createGroupChat.php(测试创建群组功能)

执行代码并创建568、569群组(群主为jack)

include &#39;./ManyPullMessage.class.php&#39;;
$object=new ManyPullMessage(&#39;192.168.95.11&#39;);
#创建群组
$user=&#39;jack&#39;;
$arr=array(&#39;jane1&#39;,&#39;jane2&#39;);
$a=$object->createGroupChat($user,$arr);
echo "<pre class="brush:php;toolbar:false">";
print_r($a);
echo "
";die;

2、建立addMembers.php(测试添加成员功能)

执行代码并添加新成员

 include &#39;./ManyPullMessage.class.php&#39;;
 $object=new ManyPullMessage(&#39;192.168.95.11&#39;);
 $b=$object->addMembers(&#39;jack&#39;,&#39;568&#39;,array(&#39;jane1&#39;,&#39;jane2&#39;,&#39;jane3&#39;,&#39;jane4&#39;));
 echo "<pre class="brush:php;toolbar:false">";
 print_r($b);
 echo "
";die;

3、建立delete.php(测试群主删除成员功能)

include &#39;./ManyPullMessage.class.php&#39;;
$object=new ManyPullMessage(&#39;192.168.95.11&#39;);
#群主删除成员
$c=$object->delMembers(&#39;jack&#39;, &#39;568&#39;, array(&#39;jane1&#39;,&#39;jane4&#39;));
echo "<pre class="brush:php;toolbar:false">";
print_r($c);
echo "
";die;

4、建立sendMessage.php(测试发送消息功能)

多执行几遍,568、569都发几条

include &#39;./ManyPullMessage.class.php&#39;;
$object=new ManyPullMessage(&#39;192.168.95.11&#39;);
#发送消息
$user=&#39;jane2&#39;;
$message=&#39;go go go&#39;;
$groupChatID=568;
$arr=array(&#39;sender&#39;=>$user, &#39;message&#39;=>$message, &#39;time&#39;=>time());
$d=$object->sendMessage($user,$groupChatID,$arr);
echo "<pre class="brush:php;toolbar:false">";
print_r($d);
echo "
";die;

5、建立getNewMessage.php(测试用户获取新消息功能)

include &#39;./ManyPullMessage.class.php&#39;;
$object=new ManyPullMessage(&#39;192.168.95.11&#39;);
#用户获取新消息
$e=$object->getNewMessage(&#39;jane2&#39;);
echo "<pre class="brush:php;toolbar:false">";
print_r($e);
echo "
";die;

6、建立getPartMessage.php(测试用户获取某个群组部分消息)

(多发送几条消息,用于测试。568中共18条数据)

include &#39;./ManyPullMessage.class.php&#39;;
$object=new ManyPullMessage(&#39;192.168.95.11&#39;);
#用户获取某个群组部分消息
$f=$object->getPartMessage(&#39;jane2&#39;, 568, 1, 10); 
echo "<pre class="brush:php;toolbar:false">";
print_r($f);
echo "
";die;

page=1,size=10

page=2,size=10

测试完毕,还需要别的功能可以自己进行修改添加测试。

以上就是本文的全部内容,希望对大家的学习有所帮助。


相关推荐:

PHP基于Redis消息队列发布微博的方法详解

PHPSession入库/存入redis的方法详解

PHP를 사용하여 Redis 대기열에서 전자상거래 주문 접수를 자동으로 확인하는 방법

위 내용은 PHP Redis의 애플리케이션 메시징에 대한 자세한 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.