首頁 >資料庫 >mysql教程 >基于Hiredis异步API的聊天系统实现

基于Hiredis异步API的聊天系统实现

WBOY
WBOY原創
2016-06-07 14:51:121536瀏覽

基于Hiredis异步API的聊天系统实现 上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的

基于Hiredis异步API的聊天系统实现

上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的问题。使用同步API,订阅频道后,客户端会进入阻塞状态,等待订阅频道发布的消息,不能实现既订阅频道,又能发布消息的功能。为了实现一个客户端既能订阅频道,又能发布消息的功能,就需要使用Hiredis的异步API。

首先特别感谢黄天霸、tickTick、vah101,当我遇到各种各样奇怪问题的时候,你们的帖子给与了我解答,谢谢。

开始正题,hiredis异步的实现主要是依靠redis自带的ae事件库或者libev事件库或者libevent的事件库或者libuv事件库。网上一些人是通过libevent事件库来实现,本系统则使用Redis自带ae事件库来实现,Redis不用libevent事件库而选择重写一个ae事件库,必定有其道理。

首先介绍使用到的异步API,位于async.h中:

<code class=" hljs perl">redisAsyncContext <span class="hljs-variable">*redisAsyncConnect</span>(const char <span class="hljs-variable">*ip</span>, <span class="hljs-keyword">int</span> port); <span class="hljs-regexp">//</span>用于建立异步连接

<span class="hljs-keyword">int</span> redisAsyncSetConnectCallback(redisAsyncContext <span class="hljs-variable">*ac</span>, redisConnectCallback <span class="hljs-variable">*fn</span>);  <span class="hljs-regexp">//</span>设置连接回调函数,回调函数形式:void callback(const redisAsyncContext <span class="hljs-variable">*c</span>, <span class="hljs-keyword">int</span> status);

<span class="hljs-keyword">int</span> redisAsyncSetDisconnectCallback(redisAsyncContext <span class="hljs-variable">*ac</span>, redisDisconnectCallback <span class="hljs-variable">*fn</span>);  <span class="hljs-regexp">//</span>设置断开连接回调函数,回调函数形式:void callback(const redisAsyncContext <span class="hljs-variable">*c</span>, <span class="hljs-keyword">int</span> status);

void redisAsyncDisconnect(redisAsyncContext <span class="hljs-variable">*ac</span>); <span class="hljs-regexp">//</span>断开异步连接

void redisAsyncFree(redisAsyncContext <span class="hljs-variable">*ac</span>); <span class="hljs-regexp">//</span>释放建立连接时,创建的redisAsyncContext结构

<span class="hljs-keyword">int</span> redisAsyncCommand(redisAsyncContext <span class="hljs-variable">*ac</span>, redisCallbackFn <span class="hljs-variable">*fn</span>, void <span class="hljs-variable">*privdata</span>, const char <span class="hljs-variable">*format</span>, ...); <span class="hljs-regexp">//</span>发送Redis命令,需要实现一个回调函数来出来命令的返回,fn是回调函数的地址,回调函数形式:void callback(redisAsyncContext <span class="hljs-variable">*c</span>, void <span class="hljs-variable">*reply</span>, void <span class="hljs-variable">*pridata</span>);
</code>

有了上面的异步API,就可以开始客户端的撰写。
首先封装订阅端

<code class=" hljs cpp"><span class="hljs-comment">//sub_client.h</span>

<span class="hljs-preprocessor">#ifndef REDIS_SUB_CLIENT_H  </span>
<span class="hljs-preprocessor">#define REDIS_SUB_CLIENT_H  </span>

<span class="hljs-keyword">extern</span> <span class="hljs-string">"C"</span>
{  
<span class="hljs-preprocessor">#include <stdlib.h>  </span>
<span class="hljs-preprocessor">#include <async.h>  </span>
<span class="hljs-preprocessor">#include <adapters/ae.h> </span>
<span class="hljs-preprocessor">#include <unistd.h>  </span>
<span class="hljs-preprocessor">#include <pthread.h>  </span>
<span class="hljs-preprocessor">#include <semaphore.h> </span>
} 

<span class="hljs-preprocessor">#include <string>  </span>
<span class="hljs-preprocessor">#include <vector>  </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>;

<span class="hljs-keyword">class</span> CRedisSubClient  
{  
<span class="hljs-keyword">public</span>:   

    CRedisSubClient();  
    ~CRedisSubClient();  

    <span class="hljs-keyword">bool</span> init();   <span class="hljs-comment">//初始化,事件对象,信号量  </span>
    <span class="hljs-keyword">bool</span> uninit();  <span class="hljs-comment">//释放对象</span>
    <span class="hljs-keyword">bool</span> connect();  <span class="hljs-comment">//连接服务器</span>
    <span class="hljs-keyword">bool</span> disconnect(); <span class="hljs-comment">//断开服务器</span>

    <span class="hljs-comment">//订阅频道 </span>
    <span class="hljs-keyword">bool</span> subscribe(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name);  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// 下面三个回调函数供redis服务调用  </span>
    <span class="hljs-comment">// 连接回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 断开连接的回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> disconnect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 执行命令回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> command_callback(redisAsyncContext *redis_context,  
        <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata);  

    <span class="hljs-comment">// 事件分发线程函数  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> *event_thread(<span class="hljs-keyword">void</span> *data);  
    <span class="hljs-keyword">void</span> *event_proc();  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// ae事件对象  </span>
    aeEventLoop *loop;  
    <span class="hljs-comment">// 事件线程ID  </span>
    pthread_t _event_thread;  
    <span class="hljs-comment">// 事件线程的信号量  </span>
    sem_t _event_sem;  
    <span class="hljs-comment">// hiredis异步对象  </span>
    redisAsyncContext *_redis_context;  

};  

<span class="hljs-preprocessor">#endif  </span>
</code>

使用extern “C”是因为redis和hiredis都是c写的,当使用c++链接c代码生成的库中的函数时,会出现undefined reference的问题。

订阅端的实现

<code class=" hljs cpp"><span class="hljs-comment">//sub_client.cpp</span>

<span class="hljs-preprocessor">#include <stddef.h>  </span>
<span class="hljs-preprocessor">#include <assert.h>  </span>
<span class="hljs-preprocessor">#include <string.h>  </span>
<span class="hljs-preprocessor">#include "sub_client.h" </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>; 

CRedisSubClient::CRedisSubClient():loop(<span class="hljs-number">0</span>), _event_thread(<span class="hljs-number">0</span>),  
_redis_context(<span class="hljs-number">0</span>)  
{  
}  

CRedisSubClient::~CRedisSubClient()  
{  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::init()  
{  

    loop = aeCreateEventLoop(<span class="hljs-number">64</span>);    <span class="hljs-comment">// 创建ae对象</span>

    <span class="hljs-keyword">if</span> (NULL == loop)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create redis event failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">memset</span>(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-keyword">sizeof</span>(_event_sem)); 

    <span class="hljs-keyword">int</span> ret = sem_init(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-number">0</span>);  <span class="hljs-comment">//初始化线程信号量</span>

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init sem failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::uninit()  
{  
    loop = NULL;  

    sem_destroy(&_event_sem);  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::connect()  
{    
    _redis_context = redisAsyncConnect(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">6379</span>);    <span class="hljs-comment">// 异步连接到redis服务器上,使用6380端口</span>

    <span class="hljs-keyword">if</span> (NULL == _redis_context)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Connect redis failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">if</span> (_redis_context->err)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Connect redis error: %d, %s\n"</span>,   
            _redis_context->err, _redis_context->errstr);    <span class="hljs-comment">// 输出错误信息  </span>
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    redisAeAttach(loop,_redis_context);   <span class="hljs-comment">// 将事件绑定到redis context上,使redis的回调跟事件关联  </span>

    <span class="hljs-comment">// 创建事件处理线程  </span>
    <span class="hljs-keyword">int</span> ret = pthread_create(&_event_thread, <span class="hljs-number">0</span>, &CRedisSubscriber::event_thread, <span class="hljs-keyword">this</span>);  

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create event thread failed.\n"</span>);  
        disconnect();  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-comment">// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态  </span>
    redisAsyncSetConnectCallback(_redis_context,   
        &CRedisSubClient::connect_callback);  

    <span class="hljs-comment">// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连  </span>
    redisAsyncSetDisconnectCallback(_redis_context,  
        &CRedisSubClient::disconnect_callback);  

    <span class="hljs-comment">// 启动事件线程  </span>
    sem_post(&_event_sem);  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::disconnect()  
{  
    <span class="hljs-keyword">if</span> (_redis_context)  
    {  
        redisAsyncDisconnect(_redis_context);  
        redisAsyncFree(_redis_context);  
        _redis_context = NULL;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::subscribe(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name)  
{  
    <span class="hljs-keyword">int</span> ret = redisAsyncCommand(_redis_context,   
        &CRedisSubscriber::command_callback, <span class="hljs-keyword">this</span>, <span class="hljs-string">"SUBSCRIBE %s"</span>,   
        channel_name.c_str()); <span class="hljs-comment">//订阅一个频道</span>

    <span class="hljs-keyword">if</span> (REDIS_ERR == ret)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Subscribe command failed: %d\n"</span>, ret);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Subscribe success: %s\n"</span>, channel_name.c_str());  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">void</span> CRedisSubClient::connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
    <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Error: %s\n"</span>, redis_context->errstr);  
    }  
    <span class="hljs-keyword">else</span>  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Redis connected!"</span>);  
    }  
}  

<span class="hljs-keyword">void</span> CRedisSubClient::disconnect_callback(  
    <span class="hljs-keyword">const</span> redisAsyncContext *redis_context, <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {    
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error: %s\n"</span>, redis_context->errstr);  
    }  
}  

<span class="hljs-comment">// 消息接收回调函数  </span>
<span class="hljs-keyword">void</span> CRedisSubClient::command_callback(redisAsyncContext *redis_context,  
    <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata)  
{  
    <span class="hljs-keyword">if</span> (NULL == reply || NULL == privdata) {  
        <span class="hljs-keyword">return</span> ;  
    }  

    redisReply *redis_reply = <span class="hljs-keyword">reinterpret_cast</span><redisReply *>(reply);  

    <span class="hljs-comment">// 订阅接收到的消息是一个带三元素的数组  </span>
    <span class="hljs-keyword">if</span> (redis_reply->type == REDIS_REPLY_ARRAY &&  
    redis_reply->elements == <span class="hljs-number">3</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Recieve message:%s %s %s\n"</span>,  
        redis_reply->element[<span class="hljs-number">0</span>]->str  
        redis_reply->element[<span class="hljs-number">1</span>]->str  
        redis_reply->element[<span class="hljs-number">2</span>]->str);    
    }  
}  

<span class="hljs-keyword">void</span> *CRedisSubClient::event_thread(<span class="hljs-keyword">void</span> *data)  
{  
    <span class="hljs-keyword">if</span> (NULL == data)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error!\n"</span>);  
        assert(<span class="hljs-keyword">false</span>);  
        <span class="hljs-keyword">return</span> NULL;  
    }  

    CRedisSubClient *self_this = <span class="hljs-keyword">reinterpret_cast</span><CRedisSubClient *>(data);  
    <span class="hljs-keyword">return</span> self_this->event_proc();  
}  

<span class="hljs-keyword">void</span> *CRedisSubClient::event_proc()  
{  
    sem_wait(&_event_sem);  

    <span class="hljs-comment">//进行事件处理循环  </span>
    aeMain(loop);  

    <span class="hljs-keyword">return</span> NULL;  
}  </code>

发布端的封装:

<code class=" hljs cpp"><span class="hljs-comment">//pub_client.h</span>

<span class="hljs-preprocessor">#ifndef REDIS_PUB_CLIENT_H  </span>
<span class="hljs-preprocessor">#define REDIS_PUB_CLIENT_H  </span>

<span class="hljs-keyword">extern</span> <span class="hljs-string">"C"</span>
{  
<span class="hljs-preprocessor">#include <stdlib.h>  </span>
<span class="hljs-preprocessor">#include <async.h>  </span>
<span class="hljs-preprocessor">#include <adapters/ae.h> </span>
<span class="hljs-preprocessor">#include <unistd.h>  </span>
<span class="hljs-preprocessor">#include <pthread.h>  </span>
<span class="hljs-preprocessor">#include <semaphore.h> </span>
} 

<span class="hljs-preprocessor">#include <string>  </span>
<span class="hljs-preprocessor">#include <vector>  </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>;

<span class="hljs-keyword">class</span> CRedisPubClient  
{  
<span class="hljs-keyword">public</span>:   

    CRedisPubClient();  
    ~CRedisPubClient();  

    <span class="hljs-keyword">bool</span> init();   <span class="hljs-comment">//初始化,事件对象,信号量  </span>
    <span class="hljs-keyword">bool</span> uninit();  <span class="hljs-comment">//释放对象</span>
    <span class="hljs-keyword">bool</span> connect();  <span class="hljs-comment">//连接服务器</span>
    <span class="hljs-keyword">bool</span> disconnect();  <span class="hljs-comment">//断开服务器</span>

    <span class="hljs-comment">//订阅频道 </span>
    <span class="hljs-keyword">bool</span> publish(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name, <span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &message);  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// 下面三个回调函数供redis服务调用  </span>
    <span class="hljs-comment">// 连接回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 断开连接的回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> disconnect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 执行命令回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> command_callback(redisAsyncContext *redis_context,  
        <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata);  

    <span class="hljs-comment">// 事件分发线程函数  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> *event_thread(<span class="hljs-keyword">void</span> *data);  
    <span class="hljs-keyword">void</span> *event_proc();  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// ae事件对象  </span>
    aeEventLoop *loop;  
    <span class="hljs-comment">// 事件线程ID  </span>
    pthread_t _event_thread;  
    <span class="hljs-comment">// 事件线程的信号量  </span>
    sem_t _event_sem;  
    <span class="hljs-comment">// hiredis异步对象  </span>
    redisAsyncContext *_redis_context;  

};  

<span class="hljs-preprocessor">#endif  </span></code>

发布端的实现:

<code class=" hljs cpp"><span class="hljs-comment">//pub_client.cpp</span>

<span class="hljs-preprocessor">#include <stddef.h>  </span>
<span class="hljs-preprocessor">#include <assert.h>  </span>
<span class="hljs-preprocessor">#include <string.h>  </span>
<span class="hljs-preprocessor">#include "pub_client.h" </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>; 

CRedisPubClient::CRedisPubClient():loop(<span class="hljs-number">0</span>), _event_thread(<span class="hljs-number">0</span>),  
_redis_context(<span class="hljs-number">0</span>)  
{  
}  

CRedisPubClient::~CRedisPubClient()  
{  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::init()  
{  

    loop = aeCreateEventLoop(<span class="hljs-number">64</span>);    <span class="hljs-comment">// 创建ae对象</span>

    <span class="hljs-keyword">if</span> (NULL == loop)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create redis event failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">memset</span>(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-keyword">sizeof</span>(_event_sem)); 

    <span class="hljs-keyword">int</span> ret = sem_init(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-number">0</span>);  <span class="hljs-comment">//初始化线程信号量</span>

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init sem failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::uninit()  
{  
    loop = NULL;  

    sem_destroy(&_event_sem);  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::connect()  
{    
    _redis_context = redisAsyncConnect(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">6379</span>);    <span class="hljs-comment">// 异步连接到redis服务器上,使用6380端口</span>

    <span class="hljs-keyword">if</span> (NULL == _redis_context)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Connect redis failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">if</span> (_redis_context->err)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Connect redis error: %d, %s\n"</span>,   
            _redis_context->err, _redis_context->errstr);    <span class="hljs-comment">// 输出错误信息  </span>
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    redisAeAttach(loop,_redis_context);   <span class="hljs-comment">// 将事件绑定到redis context上,使redis的回调跟事件关联  </span>

    <span class="hljs-comment">// 创建事件处理线程  </span>
    <span class="hljs-keyword">int</span> ret = pthread_create(&_event_thread, <span class="hljs-number">0</span>, &CRedisSubscriber::event_thread, <span class="hljs-keyword">this</span>);  

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create event thread failed.\n"</span>);  
        disconnect();  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-comment">// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态  </span>
    redisAsyncSetConnectCallback(_redis_context,   
        &CRedisSubClient::connect_callback);  

    <span class="hljs-comment">// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连  </span>
    redisAsyncSetDisconnectCallback(_redis_context,  
        &CRedisSubClient::disconnect_callback);  

    <span class="hljs-comment">// 启动事件线程  </span>
    sem_post(&_event_sem);  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::disconnect()  
{  
    <span class="hljs-keyword">if</span> (_redis_context)  
    {  
        redisAsyncDisconnect(_redis_context);  
        redisAsyncFree(_redis_context);  
        _redis_context = NULL;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::publish(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name, <span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &message)  
{  
    <span class="hljs-keyword">int</span> ret = redisAsyncCommand(_redis_context,   
        &CRedisPublisher::command_callback, <span class="hljs-keyword">this</span>, <span class="hljs-string">"PUBLISH %s %s"</span>,   
        channel_name.c_str(), message.c_str());  <span class="hljs-comment">//发布消息  </span>

    <span class="hljs-keyword">if</span> (REDIS_ERR == ret)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Publish command failed: %d\n"</span>, ret);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;    
}  

<span class="hljs-keyword">void</span> CRedisPubClient::connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
    <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Error: %s\n"</span>, redis_context->errstr);  
    }  
    <span class="hljs-keyword">else</span>  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Redis connected!"</span>);  
    }  
}  

<span class="hljs-keyword">void</span> CRedisPubClient::disconnect_callback(  
    <span class="hljs-keyword">const</span> redisAsyncContext *redis_context, <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {    
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error: %s\n"</span>, redis_context->errstr);  
    }  
}  

<span class="hljs-comment">// 消息接收回调函数  </span>
<span class="hljs-keyword">void</span> CRedisPubClient::command_callback(redisAsyncContext *redis_context,  
    <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata)  
{  
    <span class="hljs-keyword">if</span> (NULL == reply || NULL == privdata) 
    {  
        <span class="hljs-keyword">return</span> ;  
    }  

    redisReply *redis_reply = <span class="hljs-keyword">reinterpret_cast</span><redisReply *>(reply); 

    <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Publish: %s"</span>,redis_reply.str);
}  

<span class="hljs-keyword">void</span> *CRedisPubClient::event_thread(<span class="hljs-keyword">void</span> *data)  
{  
    <span class="hljs-keyword">if</span> (NULL == data)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error!\n"</span>);  
        assert(<span class="hljs-keyword">false</span>);  
        <span class="hljs-keyword">return</span> NULL;  
    }  

    CRedisPubClient *self_this = <span class="hljs-keyword">reinterpret_cast</span><CRedisPubClient *>(data);  
    <span class="hljs-keyword">return</span> self_this->event_proc();  
}  

<span class="hljs-keyword">void</span> *CRedisPubClient::event_proc()  
{  
    sem_wait(&_event_sem);  

    <span class="hljs-comment">//进行事件处理循环  </span>
    aeMain(loop);  

    <span class="hljs-keyword">return</span> NULL;  
}  </code>

测试封装的sub_client和pub_client类:

<code class=" hljs cpp"><span class="hljs-comment">//test_subpub.cpp</span>

<span class="hljs-preprocessor">#include "pub_client.h"</span>
<span class="hljs-preprocessor">#include "sub_client.h"  </span>

<span class="hljs-keyword">int</span> main(<span class="hljs-keyword">int</span> argc, <span class="hljs-keyword">char</span> *argv[])  
{  
    CRedisPubClient publisher;  
    CRedisSubClient subcriber;

    <span class="hljs-keyword">bool</span> ret_pub = publisher.init();  
    <span class="hljs-keyword">bool</span> ret_sub = subcriber.init();

    <span class="hljs-keyword">if</span> (!ret_sub&&!ret_pub)   
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  
    }  

    ret_pub = publisher.connect(); 
    ret_sub = subcriber.connect(); 

    <span class="hljs-keyword">if</span> (!ret_sub&&!ret_pub)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"connect failed."</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  
    }  

    subscriber.subcribe(<span class="hljs-string">"sports"</span>);

    <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>)  
    {  
        publisher.publish(<span class="hljs-string">"sports"</span>, <span class="hljs-string">"ball"</span>);  
        sleep(<span class="hljs-number">1</span>);  
    }  

    publisher.disconnect();  
    publisher.uninit();  
    subscriber.disconnect();
    subscriber.disconnect();
    <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  

} </code>

终于到了编译链接运行的阶段,我在这个地方卡了快一天,期间各种编译、链接的错误。直接编译链接会出现找到不aeCreateFileEvent,aeDeleteFileEvent,aeMain等等错误。

  1. 先将redis/src文件夹中的ae.c,ae.h,ae_epoll.c,config.h,zmalloc.c,zmalloc.h拷贝至hiredis目录下
  2. 用gcc -c ae.c gcc -c zmalloc.c生成ae.o和zmalloc.o,利用这两个文件生成静态库 ar -r libar.a ae.o zmalloc.o
  3. 然后编译g++ -o test_subpub test_subpub.cpp pub_client.cpp sub_client.cpp -pthread -I ../ -I./ -I ../adapters ../libhiredis.a ../libae.a
  4. 运行./test_subpub

可以看到,客户端中消息在不断地滚动,即同时实现了订阅频道和发送消息的功能。

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn