Home >php教程 >php手册 >MIT 2012 Distributed Course Basic Source Code Analysis-Underlying Communication Implementation

MIT 2012 Distributed Course Basic Source Code Analysis-Underlying Communication Implementation

WBOY
WBOYOriginal
2016-08-20 08:47:391290browse

The content of this section is closely related to the event management encapsulation in the previous section. The main code contained in this section is in connection{.h, .cc}.

There are two main classes here: connection class and tcpsconn class. The connection class mainly serves a single socket, including reading and writing data on the socket, etc., while the tcpsconn class serves the socket. Collection, such as receiving connections, updating expired sockets, etc. Specifically, let's look at the header file.

<span style="color: #0000ff;">class</span><span style="color: #000000;"> chanmgr {
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;">:
        </span><span style="color: #0000ff;">virtual</span> <span style="color: #0000ff;">bool</span> got_pdu(connection *c, <span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> sz) = <span style="color: #800080;">0</span><span style="color: #000000;">;
        </span><span style="color: #0000ff;">virtual</span> ~<span style="color: #000000;">chanmgr() {}
};</span>

The first thing we see is this virtual base class. This class will be used in the connection and tcpsconn classes in the form of delegation. It has only one method, got_pdu, which plays an important role in RPC implementation. It will be used later. Introduce it again.

connection class

<span style="color: #008080;"> 1</span> <span style="color: #0000ff;">class</span> connection : <span style="color: #0000ff;">public</span><span style="color: #000000;"> aio_callback {
</span><span style="color: #008080;"> 2</span>     <span style="color: #0000ff;">public</span><span style="color: #000000;">:
</span><span style="color: #008080;"> 3</span>         <span style="color: #008000;">//</span><span style="color: #008000;">内部buffer类,主要用于接收/写入数据的buffer</span>
<span style="color: #008080;"> 4</span>         <span style="color: #0000ff;">struct</span><span style="color: #000000;"> charbuf {
</span><span style="color: #008080;"> 5</span>             charbuf(): buf(NULL), sz(<span style="color: #800080;">0</span>), solong(<span style="color: #800080;">0</span><span style="color: #000000;">) {}
</span><span style="color: #008080;"> 6</span>             charbuf (<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> s) : buf(b), sz(s), solong(<span style="color: #800080;">0</span><span style="color: #000000;">){}
</span><span style="color: #008080;"> 7</span>             <span style="color: #0000ff;">char</span> *<span style="color: #000000;">buf;
</span><span style="color: #008080;"> 8</span>             <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz;
</span><span style="color: #008080;"> 9</span>             <span style="color: #0000ff;">int</span> solong; <span style="color: #008000;">//</span><span style="color: #008000;">amount of bytes written or read so far</span>
<span style="color: #008080;">10</span> <span style="color: #000000;">        };
</span><span style="color: #008080;">11</span>         <span style="color: #008000;">//</span><span style="color: #008000;">m1: chanmgr, f1: socket or file, </span>
<span style="color: #008080;">12</span>         connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">);
</span><span style="color: #008080;">13</span>         ~<span style="color: #000000;">connection();
</span><span style="color: #008080;">14</span> 
<span style="color: #008080;">15</span>         <span style="color: #0000ff;">int</span> channo() { <span style="color: #0000ff;">return</span><span style="color: #000000;"> fd_; }
</span><span style="color: #008080;">16</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> isdead();
</span><span style="color: #008080;">17</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> closeconn();
</span><span style="color: #008080;">18</span> 
<span style="color: #008080;">19</span>         <span style="color: #0000ff;">bool</span> send(<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz);
</span><span style="color: #008080;">20</span>         <span style="color: #0000ff;">void</span> write_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s);
</span><span style="color: #008080;">21</span>         <span style="color: #0000ff;">void</span> read_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s);
</span><span style="color: #008080;">22</span>         <span style="color: #008000;">//</span><span style="color: #008000;">增加/减少引用计数</span>
<span style="color: #008080;">23</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> incref();
</span><span style="color: #008080;">24</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> decref();
</span><span style="color: #008080;">25</span>         <span style="color: #0000ff;">int</span> <span style="color: #0000ff;">ref</span><span style="color: #000000;">();
</span><span style="color: #008080;">26</span>                 
<span style="color: #008080;">27</span>         <span style="color: #0000ff;">int</span> compare(connection *<span style="color: #000000;">another);
</span><span style="color: #008080;">28</span>     <span style="color: #0000ff;">private</span><span style="color: #000000;">:
</span><span style="color: #008080;">29</span> 
<span style="color: #008080;">30</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> readpdu();
</span><span style="color: #008080;">31</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> writepdu();
</span><span style="color: #008080;">32</span> 
<span style="color: #008080;">33</span>         chanmgr *<span style="color: #000000;">mgr_;
</span><span style="color: #008080;">34</span>         <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> fd_;
</span><span style="color: #008080;">35</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> dead_;
</span><span style="color: #008080;">36</span> 
<span style="color: #008080;">37</span>         charbuf wpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">write pdu</span>
<span style="color: #008080;">38</span>         charbuf rpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">read pdu</span>
<span style="color: #008080;">39</span>                 
<span style="color: #008080;">40</span>         <span style="color: #0000ff;">struct</span><span style="color: #000000;"> timeval create_time_;
</span><span style="color: #008080;">41</span> 
<span style="color: #008080;">42</span>         <span style="color: #0000ff;">int</span><span style="color: #000000;"> waiters_;
</span><span style="color: #008080;">43</span>         <span style="color: #0000ff;">int</span><span style="color: #000000;"> refno_;
</span><span style="color: #008080;">44</span>         <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_;
</span><span style="color: #008080;">45</span> 
<span style="color: #008080;">46</span> <span style="color: #000000;">        pthread_mutex_t m_;
</span><span style="color: #008080;">47</span>         pthread_mutex_t ref_m_; <span style="color: #008000;">//</span><span style="color: #008000;">保护更新引用计数的安全性</span>
<span style="color: #008080;">48</span> <span style="color: #000000;">        pthread_cond_t send_complete_;
</span><span style="color: #008080;">49</span> <span style="color: #000000;">        pthread_cond_t send_wait_;
</span><span style="color: #008080;">50</span> };
View Code

This code is the definition of the connection class. It inherits from aio_callback. As mentioned in the previous section, aio_callback is used as a callback class in the event management class to read or write data. Now the connection class is equivalent to a callback class.

We can know it from the constructor of connection.

connection::connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span><span style="color: #000000;"> l1) 
: mgr_(m1), fd_(f1), dead_(</span><span style="color: #0000ff;">false</span>),waiters_(<span style="color: #800080;">0</span>), refno_(<span style="color: #800080;">1</span><span style="color: #000000;">),lossy_(l1)
{

    </span><span style="color: #0000ff;">int</span> flags =<span style="color: #000000;"> fcntl(fd_, F_GETFL, NULL);
    flags </span>|= O_NONBLOCK;  <span style="color: #008000;">//</span><span style="color: #008000;">no blocking</span>
<span style="color: #000000;">    fcntl(fd_, F_SETFL, flags);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">ignore信号</span>
<span style="color: #000000;">    signal(SIGPIPE, SIG_IGN);
    VERIFY(pthread_mutex_init(</span>&m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_mutex_init(</span>&ref_m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_cond_init(</span>&send_wait_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_cond_init(</span>&send_complete_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
 
       VERIFY(gettimeofday(</span>&create_time_, NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); 
       </span><span style="color: #008000;">//</span><span style="color: #008000;">事件管理类将本类作为回调类添加到相应的事件管理数组中</span>
    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">);
}</span>

So what is the specific function of this class? In fact, it is used to communicate on a given socket. For sending data, it will be sent until the data is sent. If the data is not sent, the event will be added to the event management and will continue to be sent in the next round of event loop. , we can see this from the send function:

<span style="color: #0000ff;">bool</span><span style="color: #000000;">
connection::send(</span><span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz)
{
    ScopedLock ml(</span>&<span style="color: #000000;">m_);
    waiters_</span>++<span style="color: #000000;">;
    </span><span style="color: #008000;">//</span><span style="color: #008000;">当活着,且write pdu中还有数据时等待数据清空(发送完)</span>
    <span style="color: #0000ff;">while</span> (!dead_ &&<span style="color: #000000;"> wpdu_.buf) {
        VERIFY(pthread_cond_wait(</span>&send_wait_, &m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    }
    waiters_</span>--<span style="color: #000000;">;
    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) {
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">;
    }
    wpdu_.buf </span>=<span style="color: #000000;"> b;
    wpdu_.sz </span>=<span style="color: #000000;"> sz;
    wpdu_.solong </span>= <span style="color: #800080;">0</span><span style="color: #000000;">;

    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (lossy_) {
        </span><span style="color: #0000ff;">if</span> ((random()%<span style="color: #800080;">100</span>) <<span style="color: #000000;"> lossy_) {
            jsl_log(JSL_DBG_1, </span><span style="color: #800000;">"</span><span style="color: #800000;">connection::send LOSSY TEST shutdown fd_ %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, fd_);
            shutdown(fd_,SHUT_RDWR);
        }
    }

    </span><span style="color: #008000;">//</span><span style="color: #008000;">发送失败时</span>
    <span style="color: #0000ff;">if</span> (!<span style="color: #000000;">writepdu()) {
        dead_ </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">;
        VERIFY(pthread_mutex_unlock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
        PollMgr::Instance()</span>-><span style="color: #000000;">block_remove_fd(fd_);
        VERIFY(pthread_mutex_lock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
    }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
        </span><span style="color: #0000ff;">if</span> (wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz) {
        }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
            </span><span style="color: #008000;">//</span><span style="color: #008000;">should be rare to need to explicitly add write callback
            </span><span style="color: #008000;">//</span><span style="color: #008000;">这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb,
            </span><span style="color: #008000;">//</span><span style="color: #008000;">就像是一个递归</span>
            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); 
            </span><span style="color: #0000ff;">while</span> (!dead_ && wpdu_.solong >= <span style="color: #800080;">0</span> && wpdu_.solong <<span style="color: #000000;"> wpdu_.sz) {
                VERIFY(pthread_cond_wait(</span>&send_complete_,&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
            }
        }
    }
    </span><span style="color: #008000;">//</span><span style="color: #008000;">清空写buffer</span>
    <span style="color: #0000ff;">bool</span> ret = (!dead_ && wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz);
    wpdu_.solong </span>= wpdu_.sz = <span style="color: #800080;">0</span><span style="color: #000000;">;
    wpdu_.buf </span>=<span style="color: #000000;"> NULL;
    </span><span style="color: #0000ff;">if</span> (waiters_ > <span style="color: #800080;">0</span><span style="color: #000000;">)
        pthread_cond_broadcast(</span>&send_wait_);  <span style="color: #008000;">//</span><span style="color: #008000;">唤醒上面的等待</span>
    <span style="color: #0000ff;">return</span><span style="color: #000000;"> ret;
}</span>
send

For reading data, continue reading when rpdu_ (read buffer) is not full. After the reading is completed, use got_pdu of the chanmgr class to process the read data.

Note that sending data/receiving data will first send the data size/receive data size, and then do the subsequent work of sending data/receiving data.

In addition to the function of sending/receiving data of the connection class, we also see a private variable refno_ variable, which is used for reference counting. Reference counting is a very common programming technique, such as in python, Reference counting is used for object management. When the reference count is 0, the object will be destroyed. The same is true for the reference counting here. This can be known from the decref function

<span style="color: #0000ff;">void</span><span style="color: #000000;">
connection::decref()
{
    VERIFY(pthread_mutex_lock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    refno_ </span>--<span style="color: #000000;">;
    VERIFY(refno_</span>>=<span style="color: #800080;">0</span><span style="color: #000000;">);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">当引用计数为0时,销毁对象</span>
    <span style="color: #0000ff;">if</span> (refno_==<span style="color: #800080;">0</span><span style="color: #000000;">) {
        VERIFY(pthread_mutex_lock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
        </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) {
            VERIFY(pthread_mutex_unlock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
            VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
            </span><span style="color: #0000ff;">delete</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">;
            </span><span style="color: #0000ff;">return</span><span style="color: #000000;">;
        }
        VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    }
    pthread_mutex_unlock(</span>&<span style="color: #000000;">ref_m_);
}</span>

tcpscon class:

This class is used to manage connections. Let’s look at its definition first

<span style="color: #008000;">/*</span><span style="color: #008000;">*
 *  管理客户连接,将连接放入一个map中map<int, connection*>
 *
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">class</span><span style="color: #000000;"> tcpsconn {
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;">:
        tcpsconn(chanmgr </span>*m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">);
        </span>~<span style="color: #000000;">tcpsconn();

        </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> accept_conn();
    </span><span style="color: #0000ff;">private</span><span style="color: #000000;">:

        pthread_mutex_t m_;
        pthread_t th_;
        </span><span style="color: #0000ff;">int</span> pipe_[<span style="color: #800080;">2</span><span style="color: #000000;">];

        </span><span style="color: #0000ff;">int</span> tcp_; <span style="color: #008000;">//</span><span style="color: #008000;">file desciptor for accepting connection</span>
        chanmgr *<span style="color: #000000;">mgr_;
        </span><span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_;
        std::map</span><<span style="color: #0000ff;">int</span>, connection *><span style="color: #000000;"> conns_;

        </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> process_accept();
};</span>

You can see that a map is defined inside. The key of the map is actually the socket corresponding to the connection class pointer. Let’s look at the constructor implementation

tcpsconn::tcpsconn(chanmgr *m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossytest) 
: mgr_(m1), lossy_(lossytest)
{

    VERIFY(pthread_mutex_init(</span>&m_,NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">);

    </span><span style="color: #0000ff;">struct</span><span style="color: #000000;"> sockaddr_in sin;
    memset(</span>&sin, <span style="color: #800080;">0</span>, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(sin));
    sin.sin_family </span>=<span style="color: #000000;"> AF_INET;
    sin.sin_port </span>=<span style="color: #000000;"> htons(port);

    tcp_ </span>= socket(AF_INET, SOCK_STREAM, <span style="color: #800080;">0</span><span style="color: #000000;">);
    </span><span style="color: #0000ff;">if</span>(tcp_ < <span style="color: #800080;">0</span><span style="color: #000000;">){
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn accept_loop socket:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">int</span> yes = <span style="color: #800080;">1</span><span style="color: #000000;">;
    </span><span style="color: #008000;">//</span><span style="color: #008000;">设置TCP参数, reuseaddr, nodelay</span>
    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes));
    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, </span>&yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes));

    </span><span style="color: #0000ff;">if</span>(bind(tcp_, (sockaddr *)&sin, <span style="color: #0000ff;">sizeof</span>(sin)) < <span style="color: #800080;">0</span><span style="color: #000000;">){
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop tcp bind:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">if</span>(listen(tcp_, <span style="color: #800080;">1000</span>) < <span style="color: #800080;">0</span><span style="color: #000000;">) {
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    jsl_log(JSL_DBG_2, </span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen on %d %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, port, 
        sin.sin_port);

    </span><span style="color: #0000ff;">if</span> (pipe(pipe_) < <span style="color: #800080;">0</span><span style="color: #000000;">) {
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop pipe:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">int</span> flags = fcntl(pipe_[<span style="color: #800080;">0</span><span style="color: #000000;">], F_GETFL, NULL);
    flags </span>|=<span style="color: #000000;"> O_NONBLOCK;
    fcntl(pipe_[</span><span style="color: #800080;">0</span>], F_SETFL, flags);  <span style="color: #008000;">//</span><span style="color: #008000;">无阻塞管道</span>
<span style="color: #000000;">
    VERIFY((th_ </span>= method_thread(<span style="color: #0000ff;">this</span>, <span style="color: #0000ff;">false</span>, &tcpsconn::accept_conn)) != <span style="color: #800080;">0</span><span style="color: #000000;">); 
}</span>
View Code

This constructor mainly initializes the server-side connection, and then creates a thread to wait for the client connection. When processing the client connection later, the connected client socket will be added to the map of conns_, that is, the socket will be created. to the corresponding relationship of the connection pointer, and then traverse conns_ to clear the dead connection, so as to achieve the effect of processing dead connections in a timely manner.

Statement:
The content of this article is voluntarily contributed by netizens, and the copyright belongs to the original author. This site does not assume corresponding legal responsibility. If you find any content suspected of plagiarism or infringement, please contact admin@php.cn