搜索
首页php教程php手册MIT 2012 分布式课程基础源码解析-底层通讯实现

本节内容和前节事件管理封装是息息相关的,本节内容主要包含的代码在connection{.h, .cc}中。

这里面最主要的有两个类:connection类和tcpsconn类,connetion类主要服务于单个套接字,包括套接字上的数据读取写入等,而tcpsconn类则是服务于套接字集合,如接收连接,更新失效套接字等。具体我们看头文件。

<span style="color: #0000ff;">class</span><span style="color: #000000;"> chanmgr {
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;">:
        </span><span style="color: #0000ff;">virtual</span> <span style="color: #0000ff;">bool</span> got_pdu(connection *c, <span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> sz) = <span style="color: #800080;">0</span><span style="color: #000000;">;
        </span><span style="color: #0000ff;">virtual</span> ~<span style="color: #000000;">chanmgr() {}
};</span>

我们首先看到的是这个虚基类类,这个类会以委托的形式用在connection和tcpsconn类中,它只有一个方法即got_pdu,它在RPC实现中扮演着重要角色,后面使用的时候会再次介绍它。

connection类

<span style="color: #008080;"> 1</span> <span style="color: #0000ff;">class</span> connection : <span style="color: #0000ff;">public</span><span style="color: #000000;"> aio_callback {
</span><span style="color: #008080;"> 2</span>     <span style="color: #0000ff;">public</span><span style="color: #000000;">:
</span><span style="color: #008080;"> 3</span>         <span style="color: #008000;">//</span><span style="color: #008000;">内部buffer类,主要用于接收/写入数据的buffer</span>
<span style="color: #008080;"> 4</span>         <span style="color: #0000ff;">struct</span><span style="color: #000000;"> charbuf {
</span><span style="color: #008080;"> 5</span>             charbuf(): buf(NULL), sz(<span style="color: #800080;">0</span>), solong(<span style="color: #800080;">0</span><span style="color: #000000;">) {}
</span><span style="color: #008080;"> 6</span>             charbuf (<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> s) : buf(b), sz(s), solong(<span style="color: #800080;">0</span><span style="color: #000000;">){}
</span><span style="color: #008080;"> 7</span>             <span style="color: #0000ff;">char</span> *<span style="color: #000000;">buf;
</span><span style="color: #008080;"> 8</span>             <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz;
</span><span style="color: #008080;"> 9</span>             <span style="color: #0000ff;">int</span> solong; <span style="color: #008000;">//</span><span style="color: #008000;">amount of bytes written or read so far</span>
<span style="color: #008080;">10</span> <span style="color: #000000;">        };
</span><span style="color: #008080;">11</span>         <span style="color: #008000;">//</span><span style="color: #008000;">m1: chanmgr, f1: socket or file, </span>
<span style="color: #008080;">12</span>         connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">);
</span><span style="color: #008080;">13</span>         ~<span style="color: #000000;">connection();
</span><span style="color: #008080;">14</span> 
<span style="color: #008080;">15</span>         <span style="color: #0000ff;">int</span> channo() { <span style="color: #0000ff;">return</span><span style="color: #000000;"> fd_; }
</span><span style="color: #008080;">16</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> isdead();
</span><span style="color: #008080;">17</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> closeconn();
</span><span style="color: #008080;">18</span> 
<span style="color: #008080;">19</span>         <span style="color: #0000ff;">bool</span> send(<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz);
</span><span style="color: #008080;">20</span>         <span style="color: #0000ff;">void</span> write_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s);
</span><span style="color: #008080;">21</span>         <span style="color: #0000ff;">void</span> read_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s);
</span><span style="color: #008080;">22</span>         <span style="color: #008000;">//</span><span style="color: #008000;">增加/减少引用计数</span>
<span style="color: #008080;">23</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> incref();
</span><span style="color: #008080;">24</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> decref();
</span><span style="color: #008080;">25</span>         <span style="color: #0000ff;">int</span> <span style="color: #0000ff;">ref</span><span style="color: #000000;">();
</span><span style="color: #008080;">26</span>                 
<span style="color: #008080;">27</span>         <span style="color: #0000ff;">int</span> compare(connection *<span style="color: #000000;">another);
</span><span style="color: #008080;">28</span>     <span style="color: #0000ff;">private</span><span style="color: #000000;">:
</span><span style="color: #008080;">29</span> 
<span style="color: #008080;">30</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> readpdu();
</span><span style="color: #008080;">31</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> writepdu();
</span><span style="color: #008080;">32</span> 
<span style="color: #008080;">33</span>         chanmgr *<span style="color: #000000;">mgr_;
</span><span style="color: #008080;">34</span>         <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> fd_;
</span><span style="color: #008080;">35</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> dead_;
</span><span style="color: #008080;">36</span> 
<span style="color: #008080;">37</span>         charbuf wpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">write pdu</span>
<span style="color: #008080;">38</span>         charbuf rpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">read pdu</span>
<span style="color: #008080;">39</span>                 
<span style="color: #008080;">40</span>         <span style="color: #0000ff;">struct</span><span style="color: #000000;"> timeval create_time_;
</span><span style="color: #008080;">41</span> 
<span style="color: #008080;">42</span>         <span style="color: #0000ff;">int</span><span style="color: #000000;"> waiters_;
</span><span style="color: #008080;">43</span>         <span style="color: #0000ff;">int</span><span style="color: #000000;"> refno_;
</span><span style="color: #008080;">44</span>         <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_;
</span><span style="color: #008080;">45</span> 
<span style="color: #008080;">46</span> <span style="color: #000000;">        pthread_mutex_t m_;
</span><span style="color: #008080;">47</span>         pthread_mutex_t ref_m_; <span style="color: #008000;">//</span><span style="color: #008000;">保护更新引用计数的安全性</span>
<span style="color: #008080;">48</span> <span style="color: #000000;">        pthread_cond_t send_complete_;
</span><span style="color: #008080;">49</span> <span style="color: #000000;">        pthread_cond_t send_wait_;
</span><span style="color: #008080;">50</span> };
View Code

这段代码即是connetion类的定义,它继承至aio_callback,在上一节说过,aio_callback在事件管理类中作为回调类,读取或写入数据,现在connection类就相当于一个回调类。

我们从connection的构造函数中便可以得知。

connection::connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span><span style="color: #000000;"> l1) 
: mgr_(m1), fd_(f1), dead_(</span><span style="color: #0000ff;">false</span>),waiters_(<span style="color: #800080;">0</span>), refno_(<span style="color: #800080;">1</span><span style="color: #000000;">),lossy_(l1)
{

    </span><span style="color: #0000ff;">int</span> flags =<span style="color: #000000;"> fcntl(fd_, F_GETFL, NULL);
    flags </span>|= O_NONBLOCK;  <span style="color: #008000;">//</span><span style="color: #008000;">no blocking</span>
<span style="color: #000000;">    fcntl(fd_, F_SETFL, flags);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">ignore信号</span>
<span style="color: #000000;">    signal(SIGPIPE, SIG_IGN);
    VERIFY(pthread_mutex_init(</span>&m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_mutex_init(</span>&ref_m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_cond_init(</span>&send_wait_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_cond_init(</span>&send_complete_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
 
       VERIFY(gettimeofday(</span>&create_time_, NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); 
       </span><span style="color: #008000;">//</span><span style="color: #008000;">事件管理类将本类作为回调类添加到相应的事件管理数组中</span>
    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">);
}</span>

 那这个类的具体作用是啥呢?其实它就是用于在给定套接字上通信用的,对于发送数据,会发送直到数据发送完成为止,未发送完成则会将该事件添加到事件管理中,在下一轮事件循环中继续发送,这一点我们可以从send函数中看出:

<span style="color: #0000ff;">bool</span><span style="color: #000000;">
connection::send(</span><span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz)
{
    ScopedLock ml(</span>&<span style="color: #000000;">m_);
    waiters_</span>++<span style="color: #000000;">;
    </span><span style="color: #008000;">//</span><span style="color: #008000;">当活着,且write pdu中还有数据时等待数据清空(发送完)</span>
    <span style="color: #0000ff;">while</span> (!dead_ &&<span style="color: #000000;"> wpdu_.buf) {
        VERIFY(pthread_cond_wait(</span>&send_wait_, &m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    }
    waiters_</span>--<span style="color: #000000;">;
    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) {
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">;
    }
    wpdu_.buf </span>=<span style="color: #000000;"> b;
    wpdu_.sz </span>=<span style="color: #000000;"> sz;
    wpdu_.solong </span>= <span style="color: #800080;">0</span><span style="color: #000000;">;

    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (lossy_) {
        </span><span style="color: #0000ff;">if</span> ((random()%<span style="color: #800080;">100</span>)  lossy_) {
            jsl_log(JSL_DBG_1, <span style="color: #800000;">"</span><span style="color: #800000;">connection::send LOSSY TEST shutdown fd_ %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, fd_);
            shutdown(fd_,SHUT_RDWR);
        }
    }

    </span><span style="color: #008000;">//</span><span style="color: #008000;">发送失败时</span>
    <span style="color: #0000ff;">if</span> (!<span style="color: #000000;">writepdu()) {
        dead_ </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">;
        VERIFY(pthread_mutex_unlock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
        PollMgr::Instance()</span>-><span style="color: #000000;">block_remove_fd(fd_);
        VERIFY(pthread_mutex_lock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
    }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
        </span><span style="color: #0000ff;">if</span> (wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz) {
        }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
            </span><span style="color: #008000;">//</span><span style="color: #008000;">should be rare to need to explicitly add write callback
            </span><span style="color: #008000;">//</span><span style="color: #008000;">这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb,
            </span><span style="color: #008000;">//</span><span style="color: #008000;">就像是一个递归</span>
            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); 
            </span><span style="color: #0000ff;">while</span> (!dead_ && wpdu_.solong >= <span style="color: #800080;">0</span> && wpdu_.solong  wpdu_.sz) {
                VERIFY(pthread_cond_wait(&send_complete_,&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
            }
        }
    }
    </span><span style="color: #008000;">//</span><span style="color: #008000;">清空写buffer</span>
    <span style="color: #0000ff;">bool</span> ret = (!dead_ && wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz);
    wpdu_.solong </span>= wpdu_.sz = <span style="color: #800080;">0</span><span style="color: #000000;">;
    wpdu_.buf </span>=<span style="color: #000000;"> NULL;
    </span><span style="color: #0000ff;">if</span> (waiters_ > <span style="color: #800080;">0</span><span style="color: #000000;">)
        pthread_cond_broadcast(</span>&send_wait_);  <span style="color: #008000;">//</span><span style="color: #008000;">唤醒上面的等待</span>
    <span style="color: #0000ff;">return</span><span style="color: #000000;"> ret;
}</span>
send

对于读取数据,则当rpdu_(read buffer)未满时继续读,读取完成后就是用chanmgr类的got_pdu处理读取后的数据。

注意发送数据/接收数据都会首先发送数据大小/接收数据大小,然后再做后续发送数据/接收数据的工作。

除了connection类的发送/接收数据的功能外,我们还看到一个私有变量refno_变量,该变量的作用是用于引用计数,引用计数是一种很常见的编程技巧,例如在python中,引用计数用于对象的管理,当引用计数为0时,对象便会销毁,这里的引用计数也是也是同样的道理,这一点可以从decref函数中得知

<span style="color: #0000ff;">void</span><span style="color: #000000;">
connection::decref()
{
    VERIFY(pthread_mutex_lock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    refno_ </span>--<span style="color: #000000;">;
    VERIFY(refno_</span>>=<span style="color: #800080;">0</span><span style="color: #000000;">);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">当引用计数为0时,销毁对象</span>
    <span style="color: #0000ff;">if</span> (refno_==<span style="color: #800080;">0</span><span style="color: #000000;">) {
        VERIFY(pthread_mutex_lock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
        </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) {
            VERIFY(pthread_mutex_unlock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
            VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
            </span><span style="color: #0000ff;">delete</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">;
            </span><span style="color: #0000ff;">return</span><span style="color: #000000;">;
        }
        VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    }
    pthread_mutex_unlock(</span>&<span style="color: #000000;">ref_m_);
}</span>

tcpscon类:

这个类则是用于管理connection的,我们先看它的定义

<span style="color: #008000;">/*</span><span style="color: #008000;">*
 *  管理客户连接,将连接放入一个map中map<int connection>
 *
 </int></span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">class</span><span style="color: #000000;"> tcpsconn {
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;">:
        tcpsconn(chanmgr </span>*m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">);
        </span>~<span style="color: #000000;">tcpsconn();

        </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> accept_conn();
    </span><span style="color: #0000ff;">private</span><span style="color: #000000;">:

        pthread_mutex_t m_;
        pthread_t th_;
        </span><span style="color: #0000ff;">int</span> pipe_[<span style="color: #800080;">2</span><span style="color: #000000;">];

        </span><span style="color: #0000ff;">int</span> tcp_; <span style="color: #008000;">//</span><span style="color: #008000;">file desciptor for accepting connection</span>
        chanmgr *<span style="color: #000000;">mgr_;
        </span><span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_;
        std::map</span>int, connection *><span style="color: #000000;"> conns_;

        </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> process_accept();
};</span>

可看到里面定义了一个map,该map的key其实是connection类指针对应的套接字,我们看构造函数实现

tcpsconn::tcpsconn(chanmgr *m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossytest) 
: mgr_(m1), lossy_(lossytest)
{

    VERIFY(pthread_mutex_init(</span>&m_,NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">);

    </span><span style="color: #0000ff;">struct</span><span style="color: #000000;"> sockaddr_in sin;
    memset(</span>&sin, <span style="color: #800080;">0</span>, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(sin));
    sin.sin_family </span>=<span style="color: #000000;"> AF_INET;
    sin.sin_port </span>=<span style="color: #000000;"> htons(port);

    tcp_ </span>= socket(AF_INET, SOCK_STREAM, <span style="color: #800080;">0</span><span style="color: #000000;">);
    </span><span style="color: #0000ff;">if</span>(tcp_ 0<span style="color: #000000;">){
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn accept_loop socket:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">int</span> yes = <span style="color: #800080;">1</span><span style="color: #000000;">;
    </span><span style="color: #008000;">//</span><span style="color: #008000;">设置TCP参数, reuseaddr, nodelay</span>
    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes));
    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, </span>&yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes));

    </span><span style="color: #0000ff;">if</span>(bind(tcp_, (sockaddr *)&sin, <span style="color: #0000ff;">sizeof</span>(sin)) 0<span style="color: #000000;">){
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop tcp bind:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">if</span>(listen(tcp_, <span style="color: #800080;">1000</span>) 0<span style="color: #000000;">) {
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    jsl_log(JSL_DBG_2, </span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen on %d %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, port, 
        sin.sin_port);

    </span><span style="color: #0000ff;">if</span> (pipe(pipe_) 0<span style="color: #000000;">) {
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop pipe:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">int</span> flags = fcntl(pipe_[<span style="color: #800080;">0</span><span style="color: #000000;">], F_GETFL, NULL);
    flags </span>|=<span style="color: #000000;"> O_NONBLOCK;
    fcntl(pipe_[</span><span style="color: #800080;">0</span>], F_SETFL, flags);  <span style="color: #008000;">//</span><span style="color: #008000;">无阻塞管道</span>
<span style="color: #000000;">
    VERIFY((th_ </span>= method_thread(<span style="color: #0000ff;">this</span>, <span style="color: #0000ff;">false</span>, &tcpsconn::accept_conn)) != <span style="color: #800080;">0</span><span style="color: #000000;">); 
}</span>
View Code

该构造函数主要是初始化服务器端连接,然后创建一个线程来等待客户端的连接,后面处理客户端连接时,会将连接的客户端套接字添加到conns_的map中,即创建套接字到connection指针的对应关系,然后遍历conns_,清除死亡的connection,从而达到及时处理死亡连接的效果。

 

声明
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn

热AI工具

Undresser.AI Undress

Undresser.AI Undress

人工智能驱动的应用程序,用于创建逼真的裸体照片

AI Clothes Remover

AI Clothes Remover

用于从照片中去除衣服的在线人工智能工具。

Undress AI Tool

Undress AI Tool

免费脱衣服图片

Clothoff.io

Clothoff.io

AI脱衣机

AI Hentai Generator

AI Hentai Generator

免费生成ai无尽的。

热门文章

R.E.P.O.能量晶体解释及其做什么(黄色晶体)
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.最佳图形设置
3 周前By尊渡假赌尊渡假赌尊渡假赌
R.E.P.O.如果您听不到任何人,如何修复音频
3 周前By尊渡假赌尊渡假赌尊渡假赌
WWE 2K25:如何解锁Myrise中的所有内容
4 周前By尊渡假赌尊渡假赌尊渡假赌

热工具

WebStorm Mac版

WebStorm Mac版

好用的JavaScript开发工具

禅工作室 13.0.1

禅工作室 13.0.1

功能强大的PHP集成开发环境

DVWA

DVWA

Damn Vulnerable Web App (DVWA) 是一个PHP/MySQL的Web应用程序,非常容易受到攻击。它的主要目标是成为安全专业人员在合法环境中测试自己的技能和工具的辅助工具,帮助Web开发人员更好地理解保护Web应用程序的过程,并帮助教师/学生在课堂环境中教授/学习Web应用程序安全。DVWA的目标是通过简单直接的界面练习一些最常见的Web漏洞,难度各不相同。请注意,该软件中

Atom编辑器mac版下载

Atom编辑器mac版下载

最流行的的开源编辑器

Dreamweaver CS6

Dreamweaver CS6

视觉化网页开发工具