検索
ホームページデータベースmysql チュートリアル基于Hiredis异步API的聊天系统实现

基于Hiredis异步API的聊天系统实现

Jun 07, 2016 pm 02:51 PM
apiに基づく成し遂げる非同期システムチャット

基于Hiredis异步API的聊天系统实现 上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的

基于Hiredis异步API的聊天系统实现

上一篇文章http://blog.csdn.net/qq_34788352/article/details/51313027使用Hiredis的同步API实现了发送消息的客户端,当我使用同步API实现订阅频道客户端时,一旦订阅频道,就会出现无法操作的情况,这是就是同步和异步的问题。使用同步API,订阅频道后,客户端会进入阻塞状态,等待订阅频道发布的消息,不能实现既订阅频道,又能发布消息的功能。为了实现一个客户端既能订阅频道,又能发布消息的功能,就需要使用Hiredis的异步API。

首先特别感谢黄天霸、tickTick、vah101,当我遇到各种各样奇怪问题的时候,你们的帖子给与了我解答,谢谢。

开始正题,hiredis异步的实现主要是依靠redis自带的ae事件库或者libev事件库或者libevent的事件库或者libuv事件库。网上一些人是通过libevent事件库来实现,本系统则使用Redis自带ae事件库来实现,Redis不用libevent事件库而选择重写一个ae事件库,必定有其道理。

首先介绍使用到的异步API,位于async.h中:

<code class=" hljs perl">redisAsyncContext <span class="hljs-variable">*redisAsyncConnect</span>(const char <span class="hljs-variable">*ip</span>, <span class="hljs-keyword">int</span> port); <span class="hljs-regexp">//</span>用于建立异步连接

<span class="hljs-keyword">int</span> redisAsyncSetConnectCallback(redisAsyncContext <span class="hljs-variable">*ac</span>, redisConnectCallback <span class="hljs-variable">*fn</span>);  <span class="hljs-regexp">//</span>设置连接回调函数,回调函数形式:void callback(const redisAsyncContext <span class="hljs-variable">*c</span>, <span class="hljs-keyword">int</span> status);

<span class="hljs-keyword">int</span> redisAsyncSetDisconnectCallback(redisAsyncContext <span class="hljs-variable">*ac</span>, redisDisconnectCallback <span class="hljs-variable">*fn</span>);  <span class="hljs-regexp">//</span>设置断开连接回调函数,回调函数形式:void callback(const redisAsyncContext <span class="hljs-variable">*c</span>, <span class="hljs-keyword">int</span> status);

void redisAsyncDisconnect(redisAsyncContext <span class="hljs-variable">*ac</span>); <span class="hljs-regexp">//</span>断开异步连接

void redisAsyncFree(redisAsyncContext <span class="hljs-variable">*ac</span>); <span class="hljs-regexp">//</span>释放建立连接时,创建的redisAsyncContext结构

<span class="hljs-keyword">int</span> redisAsyncCommand(redisAsyncContext <span class="hljs-variable">*ac</span>, redisCallbackFn <span class="hljs-variable">*fn</span>, void <span class="hljs-variable">*privdata</span>, const char <span class="hljs-variable">*format</span>, ...); <span class="hljs-regexp">//</span>发送Redis命令,需要实现一个回调函数来出来命令的返回,fn是回调函数的地址,回调函数形式:void callback(redisAsyncContext <span class="hljs-variable">*c</span>, void <span class="hljs-variable">*reply</span>, void <span class="hljs-variable">*pridata</span>);
</code>

有了上面的异步API,就可以开始客户端的撰写。
首先封装订阅端

<code class=" hljs cpp"><span class="hljs-comment">//sub_client.h</span>

<span class="hljs-preprocessor">#ifndef REDIS_SUB_CLIENT_H  </span>
<span class="hljs-preprocessor">#define REDIS_SUB_CLIENT_H  </span>

<span class="hljs-keyword">extern</span> <span class="hljs-string">"C"</span>
{  
<span class="hljs-preprocessor">#include <stdlib.h>  </span>
<span class="hljs-preprocessor">#include <async.h>  </span>
<span class="hljs-preprocessor">#include <adapters/ae.h> </span>
<span class="hljs-preprocessor">#include <unistd.h>  </span>
<span class="hljs-preprocessor">#include <pthread.h>  </span>
<span class="hljs-preprocessor">#include <semaphore.h> </span>
} 

<span class="hljs-preprocessor">#include <string>  </span>
<span class="hljs-preprocessor">#include <vector>  </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>;

<span class="hljs-keyword">class</span> CRedisSubClient  
{  
<span class="hljs-keyword">public</span>:   

    CRedisSubClient();  
    ~CRedisSubClient();  

    <span class="hljs-keyword">bool</span> init();   <span class="hljs-comment">//初始化,事件对象,信号量  </span>
    <span class="hljs-keyword">bool</span> uninit();  <span class="hljs-comment">//释放对象</span>
    <span class="hljs-keyword">bool</span> connect();  <span class="hljs-comment">//连接服务器</span>
    <span class="hljs-keyword">bool</span> disconnect(); <span class="hljs-comment">//断开服务器</span>

    <span class="hljs-comment">//订阅频道 </span>
    <span class="hljs-keyword">bool</span> subscribe(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name);  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// 下面三个回调函数供redis服务调用  </span>
    <span class="hljs-comment">// 连接回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 断开连接的回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> disconnect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 执行命令回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> command_callback(redisAsyncContext *redis_context,  
        <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata);  

    <span class="hljs-comment">// 事件分发线程函数  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> *event_thread(<span class="hljs-keyword">void</span> *data);  
    <span class="hljs-keyword">void</span> *event_proc();  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// ae事件对象  </span>
    aeEventLoop *loop;  
    <span class="hljs-comment">// 事件线程ID  </span>
    pthread_t _event_thread;  
    <span class="hljs-comment">// 事件线程的信号量  </span>
    sem_t _event_sem;  
    <span class="hljs-comment">// hiredis异步对象  </span>
    redisAsyncContext *_redis_context;  

};  

<span class="hljs-preprocessor">#endif  </span>
</code>

使用extern “C”是因为redis和hiredis都是c写的,当使用c++链接c代码生成的库中的函数时,会出现undefined reference的问题。

订阅端的实现

<code class=" hljs cpp"><span class="hljs-comment">//sub_client.cpp</span>

<span class="hljs-preprocessor">#include <stddef.h>  </span>
<span class="hljs-preprocessor">#include <assert.h>  </span>
<span class="hljs-preprocessor">#include <string.h>  </span>
<span class="hljs-preprocessor">#include "sub_client.h" </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>; 

CRedisSubClient::CRedisSubClient():loop(<span class="hljs-number">0</span>), _event_thread(<span class="hljs-number">0</span>),  
_redis_context(<span class="hljs-number">0</span>)  
{  
}  

CRedisSubClient::~CRedisSubClient()  
{  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::init()  
{  

    loop = aeCreateEventLoop(<span class="hljs-number">64</span>);    <span class="hljs-comment">// 创建ae对象</span>

    <span class="hljs-keyword">if</span> (NULL == loop)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create redis event failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">memset</span>(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-keyword">sizeof</span>(_event_sem)); 

    <span class="hljs-keyword">int</span> ret = sem_init(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-number">0</span>);  <span class="hljs-comment">//初始化线程信号量</span>

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init sem failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::uninit()  
{  
    loop = NULL;  

    sem_destroy(&_event_sem);  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::connect()  
{    
    _redis_context = redisAsyncConnect(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">6379</span>);    <span class="hljs-comment">// 异步连接到redis服务器上,使用6380端口</span>

    <span class="hljs-keyword">if</span> (NULL == _redis_context)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Connect redis failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">if</span> (_redis_context->err)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Connect redis error: %d, %s\n"</span>,   
            _redis_context->err, _redis_context->errstr);    <span class="hljs-comment">// 输出错误信息  </span>
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    redisAeAttach(loop,_redis_context);   <span class="hljs-comment">// 将事件绑定到redis context上,使redis的回调跟事件关联  </span>

    <span class="hljs-comment">// 创建事件处理线程  </span>
    <span class="hljs-keyword">int</span> ret = pthread_create(&_event_thread, <span class="hljs-number">0</span>, &CRedisSubscriber::event_thread, <span class="hljs-keyword">this</span>);  

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create event thread failed.\n"</span>);  
        disconnect();  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-comment">// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态  </span>
    redisAsyncSetConnectCallback(_redis_context,   
        &CRedisSubClient::connect_callback);  

    <span class="hljs-comment">// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连  </span>
    redisAsyncSetDisconnectCallback(_redis_context,  
        &CRedisSubClient::disconnect_callback);  

    <span class="hljs-comment">// 启动事件线程  </span>
    sem_post(&_event_sem);  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::disconnect()  
{  
    <span class="hljs-keyword">if</span> (_redis_context)  
    {  
        redisAsyncDisconnect(_redis_context);  
        redisAsyncFree(_redis_context);  
        _redis_context = NULL;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisSubClient::subscribe(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name)  
{  
    <span class="hljs-keyword">int</span> ret = redisAsyncCommand(_redis_context,   
        &CRedisSubscriber::command_callback, <span class="hljs-keyword">this</span>, <span class="hljs-string">"SUBSCRIBE %s"</span>,   
        channel_name.c_str()); <span class="hljs-comment">//订阅一个频道</span>

    <span class="hljs-keyword">if</span> (REDIS_ERR == ret)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Subscribe command failed: %d\n"</span>, ret);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Subscribe success: %s\n"</span>, channel_name.c_str());  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">void</span> CRedisSubClient::connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
    <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Error: %s\n"</span>, redis_context->errstr);  
    }  
    <span class="hljs-keyword">else</span>  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Redis connected!"</span>);  
    }  
}  

<span class="hljs-keyword">void</span> CRedisSubClient::disconnect_callback(  
    <span class="hljs-keyword">const</span> redisAsyncContext *redis_context, <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {    
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error: %s\n"</span>, redis_context->errstr);  
    }  
}  

<span class="hljs-comment">// 消息接收回调函数  </span>
<span class="hljs-keyword">void</span> CRedisSubClient::command_callback(redisAsyncContext *redis_context,  
    <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata)  
{  
    <span class="hljs-keyword">if</span> (NULL == reply || NULL == privdata) {  
        <span class="hljs-keyword">return</span> ;  
    }  

    redisReply *redis_reply = <span class="hljs-keyword">reinterpret_cast</span><redisReply *>(reply);  

    <span class="hljs-comment">// 订阅接收到的消息是一个带三元素的数组  </span>
    <span class="hljs-keyword">if</span> (redis_reply->type == REDIS_REPLY_ARRAY &&  
    redis_reply->elements == <span class="hljs-number">3</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Recieve message:%s %s %s\n"</span>,  
        redis_reply->element[<span class="hljs-number">0</span>]->str  
        redis_reply->element[<span class="hljs-number">1</span>]->str  
        redis_reply->element[<span class="hljs-number">2</span>]->str);    
    }  
}  

<span class="hljs-keyword">void</span> *CRedisSubClient::event_thread(<span class="hljs-keyword">void</span> *data)  
{  
    <span class="hljs-keyword">if</span> (NULL == data)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error!\n"</span>);  
        assert(<span class="hljs-keyword">false</span>);  
        <span class="hljs-keyword">return</span> NULL;  
    }  

    CRedisSubClient *self_this = <span class="hljs-keyword">reinterpret_cast</span><CRedisSubClient *>(data);  
    <span class="hljs-keyword">return</span> self_this->event_proc();  
}  

<span class="hljs-keyword">void</span> *CRedisSubClient::event_proc()  
{  
    sem_wait(&_event_sem);  

    <span class="hljs-comment">//进行事件处理循环  </span>
    aeMain(loop);  

    <span class="hljs-keyword">return</span> NULL;  
}  </code>

发布端的封装:

<code class=" hljs cpp"><span class="hljs-comment">//pub_client.h</span>

<span class="hljs-preprocessor">#ifndef REDIS_PUB_CLIENT_H  </span>
<span class="hljs-preprocessor">#define REDIS_PUB_CLIENT_H  </span>

<span class="hljs-keyword">extern</span> <span class="hljs-string">"C"</span>
{  
<span class="hljs-preprocessor">#include <stdlib.h>  </span>
<span class="hljs-preprocessor">#include <async.h>  </span>
<span class="hljs-preprocessor">#include <adapters/ae.h> </span>
<span class="hljs-preprocessor">#include <unistd.h>  </span>
<span class="hljs-preprocessor">#include <pthread.h>  </span>
<span class="hljs-preprocessor">#include <semaphore.h> </span>
} 

<span class="hljs-preprocessor">#include <string>  </span>
<span class="hljs-preprocessor">#include <vector>  </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>;

<span class="hljs-keyword">class</span> CRedisPubClient  
{  
<span class="hljs-keyword">public</span>:   

    CRedisPubClient();  
    ~CRedisPubClient();  

    <span class="hljs-keyword">bool</span> init();   <span class="hljs-comment">//初始化,事件对象,信号量  </span>
    <span class="hljs-keyword">bool</span> uninit();  <span class="hljs-comment">//释放对象</span>
    <span class="hljs-keyword">bool</span> connect();  <span class="hljs-comment">//连接服务器</span>
    <span class="hljs-keyword">bool</span> disconnect();  <span class="hljs-comment">//断开服务器</span>

    <span class="hljs-comment">//订阅频道 </span>
    <span class="hljs-keyword">bool</span> publish(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name, <span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &message);  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// 下面三个回调函数供redis服务调用  </span>
    <span class="hljs-comment">// 连接回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 断开连接的回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> disconnect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
        <span class="hljs-keyword">int</span> status);  

    <span class="hljs-comment">// 执行命令回调  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> command_callback(redisAsyncContext *redis_context,  
        <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata);  

    <span class="hljs-comment">// 事件分发线程函数  </span>
    <span class="hljs-keyword">static</span> <span class="hljs-keyword">void</span> *event_thread(<span class="hljs-keyword">void</span> *data);  
    <span class="hljs-keyword">void</span> *event_proc();  

<span class="hljs-keyword">private</span>:  
    <span class="hljs-comment">// ae事件对象  </span>
    aeEventLoop *loop;  
    <span class="hljs-comment">// 事件线程ID  </span>
    pthread_t _event_thread;  
    <span class="hljs-comment">// 事件线程的信号量  </span>
    sem_t _event_sem;  
    <span class="hljs-comment">// hiredis异步对象  </span>
    redisAsyncContext *_redis_context;  

};  

<span class="hljs-preprocessor">#endif  </span></code>

发布端的实现:

<code class=" hljs cpp"><span class="hljs-comment">//pub_client.cpp</span>

<span class="hljs-preprocessor">#include <stddef.h>  </span>
<span class="hljs-preprocessor">#include <assert.h>  </span>
<span class="hljs-preprocessor">#include <string.h>  </span>
<span class="hljs-preprocessor">#include "pub_client.h" </span>

<span class="hljs-keyword">using</span> <span class="hljs-keyword">namespace</span> <span class="hljs-built_in">std</span>; 

CRedisPubClient::CRedisPubClient():loop(<span class="hljs-number">0</span>), _event_thread(<span class="hljs-number">0</span>),  
_redis_context(<span class="hljs-number">0</span>)  
{  
}  

CRedisPubClient::~CRedisPubClient()  
{  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::init()  
{  

    loop = aeCreateEventLoop(<span class="hljs-number">64</span>);    <span class="hljs-comment">// 创建ae对象</span>

    <span class="hljs-keyword">if</span> (NULL == loop)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create redis event failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-built_in">memset</span>(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-keyword">sizeof</span>(_event_sem)); 

    <span class="hljs-keyword">int</span> ret = sem_init(&_event_sem, <span class="hljs-number">0</span>, <span class="hljs-number">0</span>);  <span class="hljs-comment">//初始化线程信号量</span>

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init sem failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::uninit()  
{  
    loop = NULL;  

    sem_destroy(&_event_sem);  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::connect()  
{    
    _redis_context = redisAsyncConnect(<span class="hljs-string">"127.0.0.1"</span>, <span class="hljs-number">6379</span>);    <span class="hljs-comment">// 异步连接到redis服务器上,使用6380端口</span>

    <span class="hljs-keyword">if</span> (NULL == _redis_context)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Connect redis failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">if</span> (_redis_context->err)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Connect redis error: %d, %s\n"</span>,   
            _redis_context->err, _redis_context->errstr);    <span class="hljs-comment">// 输出错误信息  </span>
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    redisAeAttach(loop,_redis_context);   <span class="hljs-comment">// 将事件绑定到redis context上,使redis的回调跟事件关联  </span>

    <span class="hljs-comment">// 创建事件处理线程  </span>
    <span class="hljs-keyword">int</span> ret = pthread_create(&_event_thread, <span class="hljs-number">0</span>, &CRedisSubscriber::event_thread, <span class="hljs-keyword">this</span>);  

    <span class="hljs-keyword">if</span> (ret != <span class="hljs-number">0</span>)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Create event thread failed.\n"</span>);  
        disconnect();  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-comment">// 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态  </span>
    redisAsyncSetConnectCallback(_redis_context,   
        &CRedisSubClient::connect_callback);  

    <span class="hljs-comment">// 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连  </span>
    redisAsyncSetDisconnectCallback(_redis_context,  
        &CRedisSubClient::disconnect_callback);  

    <span class="hljs-comment">// 启动事件线程  </span>
    sem_post(&_event_sem);  
    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::disconnect()  
{  
    <span class="hljs-keyword">if</span> (_redis_context)  
    {  
        redisAsyncDisconnect(_redis_context);  
        redisAsyncFree(_redis_context);  
        _redis_context = NULL;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;  
}  

<span class="hljs-keyword">bool</span> CRedisPubClient::publish(<span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &channel_name, <span class="hljs-keyword">const</span> <span class="hljs-built_in">string</span> &message)  
{  
    <span class="hljs-keyword">int</span> ret = redisAsyncCommand(_redis_context,   
        &CRedisPublisher::command_callback, <span class="hljs-keyword">this</span>, <span class="hljs-string">"PUBLISH %s %s"</span>,   
        channel_name.c_str(), message.c_str());  <span class="hljs-comment">//发布消息  </span>

    <span class="hljs-keyword">if</span> (REDIS_ERR == ret)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Publish command failed: %d\n"</span>, ret);  
        <span class="hljs-keyword">return</span> <span class="hljs-keyword">false</span>;  
    }  

    <span class="hljs-keyword">return</span> <span class="hljs-keyword">true</span>;    
}  

<span class="hljs-keyword">void</span> CRedisPubClient::connect_callback(<span class="hljs-keyword">const</span> redisAsyncContext *redis_context,  
    <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Error: %s\n"</span>, redis_context->errstr);  
    }  
    <span class="hljs-keyword">else</span>  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Redis connected!"</span>);  
    }  
}  

<span class="hljs-keyword">void</span> CRedisPubClient::disconnect_callback(  
    <span class="hljs-keyword">const</span> redisAsyncContext *redis_context, <span class="hljs-keyword">int</span> status)  
{  
    <span class="hljs-keyword">if</span> (status != REDIS_OK)  
    {    
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error: %s\n"</span>, redis_context->errstr);  
    }  
}  

<span class="hljs-comment">// 消息接收回调函数  </span>
<span class="hljs-keyword">void</span> CRedisPubClient::command_callback(redisAsyncContext *redis_context,  
    <span class="hljs-keyword">void</span> *reply, <span class="hljs-keyword">void</span> *privdata)  
{  
    <span class="hljs-keyword">if</span> (NULL == reply || NULL == privdata) 
    {  
        <span class="hljs-keyword">return</span> ;  
    }  

    redisReply *redis_reply = <span class="hljs-keyword">reinterpret_cast</span><redisReply *>(reply); 

    <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Publish: %s"</span>,redis_reply.str);
}  

<span class="hljs-keyword">void</span> *CRedisPubClient::event_thread(<span class="hljs-keyword">void</span> *data)  
{  
    <span class="hljs-keyword">if</span> (NULL == data)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">": Error!\n"</span>);  
        assert(<span class="hljs-keyword">false</span>);  
        <span class="hljs-keyword">return</span> NULL;  
    }  

    CRedisPubClient *self_this = <span class="hljs-keyword">reinterpret_cast</span><CRedisPubClient *>(data);  
    <span class="hljs-keyword">return</span> self_this->event_proc();  
}  

<span class="hljs-keyword">void</span> *CRedisPubClient::event_proc()  
{  
    sem_wait(&_event_sem);  

    <span class="hljs-comment">//进行事件处理循环  </span>
    aeMain(loop);  

    <span class="hljs-keyword">return</span> NULL;  
}  </code>

测试封装的sub_client和pub_client类:

<code class=" hljs cpp"><span class="hljs-comment">//test_subpub.cpp</span>

<span class="hljs-preprocessor">#include "pub_client.h"</span>
<span class="hljs-preprocessor">#include "sub_client.h"  </span>

<span class="hljs-keyword">int</span> main(<span class="hljs-keyword">int</span> argc, <span class="hljs-keyword">char</span> *argv[])  
{  
    CRedisPubClient publisher;  
    CRedisSubClient subcriber;

    <span class="hljs-keyword">bool</span> ret_pub = publisher.init();  
    <span class="hljs-keyword">bool</span> ret_sub = subcriber.init();

    <span class="hljs-keyword">if</span> (!ret_sub&&!ret_pub)   
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"Init failed.\n"</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  
    }  

    ret_pub = publisher.connect(); 
    ret_sub = subcriber.connect(); 

    <span class="hljs-keyword">if</span> (!ret_sub&&!ret_pub)  
    {  
        <span class="hljs-built_in">printf</span>(<span class="hljs-string">"connect failed."</span>);  
        <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  
    }  

    subscriber.subcribe(<span class="hljs-string">"sports"</span>);

    <span class="hljs-keyword">while</span> (<span class="hljs-keyword">true</span>)  
    {  
        publisher.publish(<span class="hljs-string">"sports"</span>, <span class="hljs-string">"ball"</span>);  
        sleep(<span class="hljs-number">1</span>);  
    }  

    publisher.disconnect();  
    publisher.uninit();  
    subscriber.disconnect();
    subscriber.disconnect();
    <span class="hljs-keyword">return</span> <span class="hljs-number">0</span>;  

} </code>

终于到了编译链接运行的阶段,我在这个地方卡了快一天,期间各种编译、链接的错误。直接编译链接会出现找到不aeCreateFileEvent,aeDeleteFileEvent,aeMain等等错误。

  1. 先将redis/src文件夹中的ae.c,ae.h,ae_epoll.c,config.h,zmalloc.c,zmalloc.h拷贝至hiredis目录下
  2. 用gcc -c ae.c gcc -c zmalloc.c生成ae.o和zmalloc.o,利用这两个文件生成静态库 ar -r libar.a ae.o zmalloc.o
  3. 然后编译g++ -o test_subpub test_subpub.cpp pub_client.cpp sub_client.cpp -pthread -I ../ -I./ -I ../adapters ../libhiredis.a ../libae.a
  4. 运行./test_subpub

可以看到,客户端中消息在不断地滚动,即同时实现了订阅频道和发送消息的功能。

声明
この記事の内容はネチズンが自主的に寄稿したものであり、著作権は原著者に帰属します。このサイトは、それに相当する法的責任を負いません。盗作または侵害の疑いのあるコンテンツを見つけた場合は、admin@php.cn までご連絡ください。
MySQLのライセンスは、他のデータベースシステムと比較してどうですか?MySQLのライセンスは、他のデータベースシステムと比較してどうですか?Apr 25, 2025 am 12:26 AM

MySQLはGPLライセンスを使用します。 1)GPLライセンスにより、MySQLの無料使用、変更、分布が可能になりますが、変更された分布はGPLに準拠する必要があります。 2)商業ライセンスは、公的な変更を回避でき、機密性を必要とする商用アプリケーションに適しています。

MyisamよりもInnodbを選びますか?MyisamよりもInnodbを選びますか?Apr 25, 2025 am 12:22 AM

Myisamの代わりにInnoDBを選択する場合の状況には、次のものが含まれます。1)トランザクションサポート、2)高い並行性環境、3)高いデータの一貫性。逆に、Myisamを選択する際の状況には、1)主に操作を読む、2)トランザクションサポートは必要ありません。 INNODBは、eコマースプラットフォームなどの高いデータの一貫性とトランザクション処理を必要とするアプリケーションに適していますが、Myisamはブログシステムなどの読み取り集約型およびトランザクションのないアプリケーションに適しています。

MySQLの外国キーの目的を説明してください。MySQLの外国キーの目的を説明してください。Apr 25, 2025 am 12:17 AM

MySQLでは、外部キーの機能は、テーブル間の関係を確立し、データの一貫性と整合性を確保することです。外部キーは、参照整合性チェックとカスケード操作を通じてデータの有効性を維持します。パフォーマンスの最適化に注意し、それらを使用するときに一般的なエラーを避けてください。

MySQLのインデックスのさまざまなタイプは何ですか?MySQLのインデックスのさまざまなタイプは何ですか?Apr 25, 2025 am 12:12 AM

MySQLには、B-Treeインデックス、ハッシュインデックス、フルテキストインデックス、空間インデックスの4つのメインインデックスタイプがあります。 1.B-Treeインデックスは、範囲クエリ、ソート、グループ化に適しており、従業員テーブルの名前列の作成に適しています。 2。HASHインデックスは、同等のクエリに適しており、メモリストレージエンジンのHASH_TABLEテーブルのID列の作成に適しています。 3。フルテキストインデックスは、記事テーブルのコンテンツ列の作成に適したテキスト検索に使用されます。 4.空間インデックスは、地理空間クエリに使用され、場所テーブルのGEOM列での作成に適しています。

MySQLでインデックスをどのように作成しますか?MySQLでインデックスをどのように作成しますか?Apr 25, 2025 am 12:06 AM

tocreateanindexinmysql、usethecreateindexstatement.1)forasinglecolumn、 "createdexidx_lastnameonemployees(lastname);" 2)foracompositeindexを使用して、 "createindexidx_nameonemployees(lastname、firstname);" 3); "3)、" 3)を使用します

MySQLはSQLiteとどのように違いますか?MySQLはSQLiteとどのように違いますか?Apr 24, 2025 am 12:12 AM

MySQLとSQLiteの主な違いは、設計コンセプトと使用法のシナリオです。1。MySQLは、大規模なアプリケーションとエンタープライズレベルのソリューションに適しており、高性能と高い並行性をサポートしています。 2。SQLiteは、モバイルアプリケーションとデスクトップソフトウェアに適しており、軽量で埋め込みやすいです。

MySQLのインデックスとは何ですか?また、パフォーマンスをどのように改善しますか?MySQLのインデックスとは何ですか?また、パフォーマンスをどのように改善しますか?Apr 24, 2025 am 12:09 AM

MySQLのインデックスは、データの取得をスピードアップするために使用されるデータベーステーブル内の1つ以上の列の順序付けられた構造です。 1)インデックスは、スキャンされたデータの量を減らすことにより、クエリ速度を改善します。 2)B-Tree Indexは、バランスの取れたツリー構造を使用します。これは、範囲クエリとソートに適しています。 3)CreateIndexステートメントを使用して、createIndexidx_customer_idonorders(customer_id)などのインデックスを作成します。 4)Composite Indexesは、createIndexIDX_CUSTOMER_ORDERONORDERS(Customer_Id、Order_date)などのマルチコラムクエリを最適化できます。 5)説明を使用してクエリ計画を分析し、回避します

データの一貫性を確保するために、MySQLでトランザクションを使用する方法を説明します。データの一貫性を確保するために、MySQLでトランザクションを使用する方法を説明します。Apr 24, 2025 am 12:09 AM

MySQLでトランザクションを使用すると、データの一貫性が保証されます。 1)StartTransactionを介してトランザクションを開始し、SQL操作を実行して、コミットまたはロールバックで送信します。 2)SavePointを使用してSave Pointを設定して、部分的なロールバックを許可します。 3)パフォーマンスの最適化の提案には、トランザクション時間の短縮、大規模なクエリの回避、分離レベルの使用が合理的に含まれます。

See all articles

ホットAIツール

Undresser.AI Undress

Undresser.AI Undress

リアルなヌード写真を作成する AI 搭載アプリ

AI Clothes Remover

AI Clothes Remover

写真から衣服を削除するオンライン AI ツール。

Undress AI Tool

Undress AI Tool

脱衣画像を無料で

Clothoff.io

Clothoff.io

AI衣類リムーバー

Video Face Swap

Video Face Swap

完全無料の AI 顔交換ツールを使用して、あらゆるビデオの顔を簡単に交換できます。

ホットツール

SecLists

SecLists

SecLists は、セキュリティ テスターの究極の相棒です。これは、セキュリティ評価中に頻繁に使用されるさまざまな種類のリストを 1 か所にまとめたものです。 SecLists は、セキュリティ テスターが必要とする可能性のあるすべてのリストを便利に提供することで、セキュリティ テストをより効率的かつ生産的にするのに役立ちます。リストの種類には、ユーザー名、パスワード、URL、ファジング ペイロード、機密データ パターン、Web シェルなどが含まれます。テスターはこのリポジトリを新しいテスト マシンにプルするだけで、必要なあらゆる種類のリストにアクセスできるようになります。

SublimeText3 Linux 新バージョン

SublimeText3 Linux 新バージョン

SublimeText3 Linux 最新バージョン

DVWA

DVWA

Damn Vulnerable Web App (DVWA) は、非常に脆弱な PHP/MySQL Web アプリケーションです。その主な目的は、セキュリティ専門家が法的環境でスキルとツールをテストするのに役立ち、Web 開発者が Web アプリケーションを保護するプロセスをより深く理解できるようにし、教師/生徒が教室環境で Web アプリケーションを教え/学習できるようにすることです。安全。 DVWA の目標は、シンプルでわかりやすいインターフェイスを通じて、さまざまな難易度で最も一般的な Web 脆弱性のいくつかを実践することです。このソフトウェアは、

ZendStudio 13.5.1 Mac

ZendStudio 13.5.1 Mac

強力な PHP 統合開発環境

Safe Exam Browser

Safe Exam Browser

Safe Exam Browser は、オンライン試験を安全に受験するための安全なブラウザ環境です。このソフトウェアは、あらゆるコンピュータを安全なワークステーションに変えます。あらゆるユーティリティへのアクセスを制御し、学生が無許可のリソースを使用するのを防ぎます。