>php教程 >php手册 >MIT 2012 분산과정 기본 소스코드 분석 - 기초 커뮤니케이션 구현

MIT 2012 분산과정 기본 소스코드 분석 - 기초 커뮤니케이션 구현

WBOY
WBOY원래의
2016-08-20 08:47:391288검색

이 섹션의 내용은 이전 섹션의 이벤트 관리 캡슐화와 밀접한 관련이 있습니다. 이 섹션에 포함된 주요 코드는 {.h, .cc}로 연결됩니다.

여기에는 연결 클래스와 tcpsconn 클래스라는 두 가지 주요 클래스가 있습니다. 연결 클래스는 주로 소켓에서 데이터 읽기 및 쓰기를 포함하여 단일 소켓을 제공하는 반면, tcpsconn 클래스는 연결 수신과 같은 소켓 수집을 제공합니다. 유효하지 않은 소켓 업데이트 등 구체적으로 헤더 파일을 살펴보자.

<span style="color: #0000ff;">class</span><span style="color: #000000;"> chanmgr {
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;">:
        </span><span style="color: #0000ff;">virtual</span> <span style="color: #0000ff;">bool</span> got_pdu(connection *c, <span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> sz) = <span style="color: #800080;">0</span><span style="color: #000000;">;
        </span><span style="color: #0000ff;">virtual</span> ~<span style="color: #000000;">chanmgr() {}
};</span>

가장 먼저 보이는 것은 이 가상 기본 클래스입니다. 이 클래스는 위임 형태로 연결 및 tcpsconn 클래스에 사용됩니다. 이 클래스에는 RPC 구현에서 중요한 역할을 하는 단 하나의 메소드가 있습니다. 나중에 다시 소개하겠습니다.

접속수업

<span style="color: #008080;"> 1</span> <span style="color: #0000ff;">class</span> connection : <span style="color: #0000ff;">public</span><span style="color: #000000;"> aio_callback {
</span><span style="color: #008080;"> 2</span>     <span style="color: #0000ff;">public</span><span style="color: #000000;">:
</span><span style="color: #008080;"> 3</span>         <span style="color: #008000;">//</span><span style="color: #008000;">内部buffer类,主要用于接收/写入数据的buffer</span>
<span style="color: #008080;"> 4</span>         <span style="color: #0000ff;">struct</span><span style="color: #000000;"> charbuf {
</span><span style="color: #008080;"> 5</span>             charbuf(): buf(NULL), sz(<span style="color: #800080;">0</span>), solong(<span style="color: #800080;">0</span><span style="color: #000000;">) {}
</span><span style="color: #008080;"> 6</span>             charbuf (<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span> s) : buf(b), sz(s), solong(<span style="color: #800080;">0</span><span style="color: #000000;">){}
</span><span style="color: #008080;"> 7</span>             <span style="color: #0000ff;">char</span> *<span style="color: #000000;">buf;
</span><span style="color: #008080;"> 8</span>             <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz;
</span><span style="color: #008080;"> 9</span>             <span style="color: #0000ff;">int</span> solong; <span style="color: #008000;">//</span><span style="color: #008000;">amount of bytes written or read so far</span>
<span style="color: #008080;">10</span> <span style="color: #000000;">        };
</span><span style="color: #008080;">11</span>         <span style="color: #008000;">//</span><span style="color: #008000;">m1: chanmgr, f1: socket or file, </span>
<span style="color: #008080;">12</span>         connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">);
</span><span style="color: #008080;">13</span>         ~<span style="color: #000000;">connection();
</span><span style="color: #008080;">14</span> 
<span style="color: #008080;">15</span>         <span style="color: #0000ff;">int</span> channo() { <span style="color: #0000ff;">return</span><span style="color: #000000;"> fd_; }
</span><span style="color: #008080;">16</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> isdead();
</span><span style="color: #008080;">17</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> closeconn();
</span><span style="color: #008080;">18</span> 
<span style="color: #008080;">19</span>         <span style="color: #0000ff;">bool</span> send(<span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz);
</span><span style="color: #008080;">20</span>         <span style="color: #0000ff;">void</span> write_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s);
</span><span style="color: #008080;">21</span>         <span style="color: #0000ff;">void</span> read_cb(<span style="color: #0000ff;">int</span><span style="color: #000000;"> s);
</span><span style="color: #008080;">22</span>         <span style="color: #008000;">//</span><span style="color: #008000;">增加/减少引用计数</span>
<span style="color: #008080;">23</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> incref();
</span><span style="color: #008080;">24</span>         <span style="color: #0000ff;">void</span><span style="color: #000000;"> decref();
</span><span style="color: #008080;">25</span>         <span style="color: #0000ff;">int</span> <span style="color: #0000ff;">ref</span><span style="color: #000000;">();
</span><span style="color: #008080;">26</span>                 
<span style="color: #008080;">27</span>         <span style="color: #0000ff;">int</span> compare(connection *<span style="color: #000000;">another);
</span><span style="color: #008080;">28</span>     <span style="color: #0000ff;">private</span><span style="color: #000000;">:
</span><span style="color: #008080;">29</span> 
<span style="color: #008080;">30</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> readpdu();
</span><span style="color: #008080;">31</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> writepdu();
</span><span style="color: #008080;">32</span> 
<span style="color: #008080;">33</span>         chanmgr *<span style="color: #000000;">mgr_;
</span><span style="color: #008080;">34</span>         <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> fd_;
</span><span style="color: #008080;">35</span>         <span style="color: #0000ff;">bool</span><span style="color: #000000;"> dead_;
</span><span style="color: #008080;">36</span> 
<span style="color: #008080;">37</span>         charbuf wpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">write pdu</span>
<span style="color: #008080;">38</span>         charbuf rpdu_; <span style="color: #008000;">//</span><span style="color: #008000;">read pdu</span>
<span style="color: #008080;">39</span>                 
<span style="color: #008080;">40</span>         <span style="color: #0000ff;">struct</span><span style="color: #000000;"> timeval create_time_;
</span><span style="color: #008080;">41</span> 
<span style="color: #008080;">42</span>         <span style="color: #0000ff;">int</span><span style="color: #000000;"> waiters_;
</span><span style="color: #008080;">43</span>         <span style="color: #0000ff;">int</span><span style="color: #000000;"> refno_;
</span><span style="color: #008080;">44</span>         <span style="color: #0000ff;">const</span> <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_;
</span><span style="color: #008080;">45</span> 
<span style="color: #008080;">46</span> <span style="color: #000000;">        pthread_mutex_t m_;
</span><span style="color: #008080;">47</span>         pthread_mutex_t ref_m_; <span style="color: #008000;">//</span><span style="color: #008000;">保护更新引用计数的安全性</span>
<span style="color: #008080;">48</span> <span style="color: #000000;">        pthread_cond_t send_complete_;
</span><span style="color: #008080;">49</span> <span style="color: #000000;">        pthread_cond_t send_wait_;
</span><span style="color: #008080;">50</span> };
코드 보기

이 코드는 aio_callback에서 상속받은 코드입니다. 이전 섹션에서 언급했듯이 aio_callback은 데이터를 읽거나 쓰는 데 사용되는 콜백 클래스입니다. 콜백 종류.

연결 생성자를 통해 알 수 있습니다.

connection::connection(chanmgr *m1, <span style="color: #0000ff;">int</span> f1, <span style="color: #0000ff;">int</span><span style="color: #000000;"> l1) 
: mgr_(m1), fd_(f1), dead_(</span><span style="color: #0000ff;">false</span>),waiters_(<span style="color: #800080;">0</span>), refno_(<span style="color: #800080;">1</span><span style="color: #000000;">),lossy_(l1)
{

    </span><span style="color: #0000ff;">int</span> flags =<span style="color: #000000;"> fcntl(fd_, F_GETFL, NULL);
    flags </span>|= O_NONBLOCK;  <span style="color: #008000;">//</span><span style="color: #008000;">no blocking</span>
<span style="color: #000000;">    fcntl(fd_, F_SETFL, flags);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">ignore信号</span>
<span style="color: #000000;">    signal(SIGPIPE, SIG_IGN);
    VERIFY(pthread_mutex_init(</span>&m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_mutex_init(</span>&ref_m_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_cond_init(</span>&send_wait_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    VERIFY(pthread_cond_init(</span>&send_complete_,<span style="color: #800080;">0</span>)==<span style="color: #800080;">0</span><span style="color: #000000;">);
 
       VERIFY(gettimeofday(</span>&create_time_, NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">); 
       </span><span style="color: #008000;">//</span><span style="color: #008000;">事件管理类将本类作为回调类添加到相应的事件管理数组中</span>
    PollMgr::Instance()->add_callback(fd_, CB_RDONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">);
}</span>

그렇다면 이 클래스의 구체적인 기능은 무엇인가요? 실제로는 특정 소켓에서 통신하는 데 사용되며, 데이터가 전송되지 않을 경우 해당 이벤트가 이벤트 관리에 추가되어 계속 전송됩니다. 이벤트 루프의 다음 라운드에서는 보내기 함수에서 이를 확인할 수 있습니다.

<span style="color: #0000ff;">bool</span><span style="color: #000000;">
connection::send(</span><span style="color: #0000ff;">char</span> *b, <span style="color: #0000ff;">int</span><span style="color: #000000;"> sz)
{
    ScopedLock ml(</span>&<span style="color: #000000;">m_);
    waiters_</span>++<span style="color: #000000;">;
    </span><span style="color: #008000;">//</span><span style="color: #008000;">当活着,且write pdu中还有数据时等待数据清空(发送完)</span>
    <span style="color: #0000ff;">while</span> (!dead_ &&<span style="color: #000000;"> wpdu_.buf) {
        VERIFY(pthread_cond_wait(</span>&send_wait_, &m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    }
    waiters_</span>--<span style="color: #000000;">;
    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) {
        </span><span style="color: #0000ff;">return</span> <span style="color: #0000ff;">false</span><span style="color: #000000;">;
    }
    wpdu_.buf </span>=<span style="color: #000000;"> b;
    wpdu_.sz </span>=<span style="color: #000000;"> sz;
    wpdu_.solong </span>= <span style="color: #800080;">0</span><span style="color: #000000;">;

    </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (lossy_) {
        </span><span style="color: #0000ff;">if</span> ((random()%<span style="color: #800080;">100</span>) <<span style="color: #000000;"> lossy_) {
            jsl_log(JSL_DBG_1, </span><span style="color: #800000;">"</span><span style="color: #800000;">connection::send LOSSY TEST shutdown fd_ %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, fd_);
            shutdown(fd_,SHUT_RDWR);
        }
    }

    </span><span style="color: #008000;">//</span><span style="color: #008000;">发送失败时</span>
    <span style="color: #0000ff;">if</span> (!<span style="color: #000000;">writepdu()) {
        dead_ </span>= <span style="color: #0000ff;">true</span><span style="color: #000000;">;
        VERIFY(pthread_mutex_unlock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
        PollMgr::Instance()</span>-><span style="color: #000000;">block_remove_fd(fd_);
        VERIFY(pthread_mutex_lock(</span>&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
    }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
        </span><span style="color: #0000ff;">if</span> (wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz) {
        }</span><span style="color: #0000ff;">else</span><span style="color: #000000;">{
            </span><span style="color: #008000;">//</span><span style="color: #008000;">should be rare to need to explicitly add write callback
            </span><span style="color: #008000;">//</span><span style="color: #008000;">这会继续写,因为这会添加本类(回调),然后调用里面的回调函数write_cb,
            </span><span style="color: #008000;">//</span><span style="color: #008000;">就像是一个递归</span>
            PollMgr::Instance()->add_callback(fd_, CB_WRONLY, <span style="color: #0000ff;">this</span><span style="color: #000000;">); 
            </span><span style="color: #0000ff;">while</span> (!dead_ && wpdu_.solong >= <span style="color: #800080;">0</span> && wpdu_.solong <<span style="color: #000000;"> wpdu_.sz) {
                VERIFY(pthread_cond_wait(</span>&send_complete_,&m_) == <span style="color: #800080;">0</span><span style="color: #000000;">);
            }
        }
    }
    </span><span style="color: #008000;">//</span><span style="color: #008000;">清空写buffer</span>
    <span style="color: #0000ff;">bool</span> ret = (!dead_ && wpdu_.solong ==<span style="color: #000000;"> wpdu_.sz);
    wpdu_.solong </span>= wpdu_.sz = <span style="color: #800080;">0</span><span style="color: #000000;">;
    wpdu_.buf </span>=<span style="color: #000000;"> NULL;
    </span><span style="color: #0000ff;">if</span> (waiters_ > <span style="color: #800080;">0</span><span style="color: #000000;">)
        pthread_cond_broadcast(</span>&send_wait_);  <span style="color: #008000;">//</span><span style="color: #008000;">唤醒上面的等待</span>
    <span style="color: #0000ff;">return</span><span style="color: #000000;"> ret;
}</span>
보내기

데이터를 읽으려면 rpdu_(읽기 버퍼)가 꽉 찼을 때 계속 읽으세요. 읽기가 완료된 후 chanmgr 클래스의 got_pdu를 사용하여 읽은 데이터를 처리하세요.

데이터 송신/데이터 수신은 먼저 데이터 크기/수신 데이터 크기를 송신한 후 데이터 송신/수신 작업을 수행합니다.

연결 클래스의 데이터 전송/수신 기능 외에도 참조 카운팅에 사용되는 전용 변수 refno_ 변수도 참조 카운팅과 같은 매우 일반적인 프로그래밍 기술입니다. 객체 관리에 사용됩니다. 참조 횟수가 0이면 객체가 삭제됩니다. 여기서 참조 횟수도 마찬가지입니다.

<span style="color: #0000ff;">void</span><span style="color: #000000;">
connection::decref()
{
    VERIFY(pthread_mutex_lock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    refno_ </span>--<span style="color: #000000;">;
    VERIFY(refno_</span>>=<span style="color: #800080;">0</span><span style="color: #000000;">);
    </span><span style="color: #008000;">//</span><span style="color: #008000;">当引用计数为0时,销毁对象</span>
    <span style="color: #0000ff;">if</span> (refno_==<span style="color: #800080;">0</span><span style="color: #000000;">) {
        VERIFY(pthread_mutex_lock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
        </span><span style="color: #0000ff;">if</span><span style="color: #000000;"> (dead_) {
            VERIFY(pthread_mutex_unlock(</span>&ref_m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
            VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
            </span><span style="color: #0000ff;">delete</span> <span style="color: #0000ff;">this</span><span style="color: #000000;">;
            </span><span style="color: #0000ff;">return</span><span style="color: #000000;">;
        }
        VERIFY(pthread_mutex_unlock(</span>&m_)==<span style="color: #800080;">0</span><span style="color: #000000;">);
    }
    pthread_mutex_unlock(</span>&<span style="color: #000000;">ref_m_);
}</span>

tcpscon 클래스:

이 클래스는 연결을 관리하는 데 사용됩니다. 먼저 정의를 살펴보겠습니다

<span style="color: #008000;">/*</span><span style="color: #008000;">*
 *  管理客户连接,将连接放入一个map中map<int, connection*>
 *
 </span><span style="color: #008000;">*/</span>
<span style="color: #0000ff;">class</span><span style="color: #000000;"> tcpsconn {
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;">:
        tcpsconn(chanmgr </span>*m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span> lossytest=<span style="color: #800080;">0</span><span style="color: #000000;">);
        </span>~<span style="color: #000000;">tcpsconn();

        </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> accept_conn();
    </span><span style="color: #0000ff;">private</span><span style="color: #000000;">:

        pthread_mutex_t m_;
        pthread_t th_;
        </span><span style="color: #0000ff;">int</span> pipe_[<span style="color: #800080;">2</span><span style="color: #000000;">];

        </span><span style="color: #0000ff;">int</span> tcp_; <span style="color: #008000;">//</span><span style="color: #008000;">file desciptor for accepting connection</span>
        chanmgr *<span style="color: #000000;">mgr_;
        </span><span style="color: #0000ff;">int</span><span style="color: #000000;"> lossy_;
        std::map</span><<span style="color: #0000ff;">int</span>, connection *><span style="color: #000000;"> conns_;

        </span><span style="color: #0000ff;">void</span><span style="color: #000000;"> process_accept();
};</span>

내부에 맵이 정의되어 있는 것을 볼 수 있습니다. 맵의 키는 실제로 연결 클래스 포인터에 해당하는 소켓입니다.

tcpsconn::tcpsconn(chanmgr *m1, <span style="color: #0000ff;">int</span> port, <span style="color: #0000ff;">int</span><span style="color: #000000;"> lossytest) 
: mgr_(m1), lossy_(lossytest)
{

    VERIFY(pthread_mutex_init(</span>&m_,NULL) == <span style="color: #800080;">0</span><span style="color: #000000;">);

    </span><span style="color: #0000ff;">struct</span><span style="color: #000000;"> sockaddr_in sin;
    memset(</span>&sin, <span style="color: #800080;">0</span>, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(sin));
    sin.sin_family </span>=<span style="color: #000000;"> AF_INET;
    sin.sin_port </span>=<span style="color: #000000;"> htons(port);

    tcp_ </span>= socket(AF_INET, SOCK_STREAM, <span style="color: #800080;">0</span><span style="color: #000000;">);
    </span><span style="color: #0000ff;">if</span>(tcp_ < <span style="color: #800080;">0</span><span style="color: #000000;">){
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn accept_loop socket:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">int</span> yes = <span style="color: #800080;">1</span><span style="color: #000000;">;
    </span><span style="color: #008000;">//</span><span style="color: #008000;">设置TCP参数, reuseaddr, nodelay</span>
    setsockopt(tcp_, SOL_SOCKET, SO_REUSEADDR, &yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes));
    setsockopt(tcp_, IPPROTO_TCP, TCP_NODELAY, </span>&yes, <span style="color: #0000ff;">sizeof</span><span style="color: #000000;">(yes));

    </span><span style="color: #0000ff;">if</span>(bind(tcp_, (sockaddr *)&sin, <span style="color: #0000ff;">sizeof</span>(sin)) < <span style="color: #800080;">0</span><span style="color: #000000;">){
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop tcp bind:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">if</span>(listen(tcp_, <span style="color: #800080;">1000</span>) < <span style="color: #800080;">0</span><span style="color: #000000;">) {
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    jsl_log(JSL_DBG_2, </span><span style="color: #800000;">"</span><span style="color: #800000;">tcpsconn::tcpsconn listen on %d %d\n</span><span style="color: #800000;">"</span><span style="color: #000000;">, port, 
        sin.sin_port);

    </span><span style="color: #0000ff;">if</span> (pipe(pipe_) < <span style="color: #800080;">0</span><span style="color: #000000;">) {
        perror(</span><span style="color: #800000;">"</span><span style="color: #800000;">accept_loop pipe:</span><span style="color: #800000;">"</span><span style="color: #000000;">);
        VERIFY(</span><span style="color: #800080;">0</span><span style="color: #000000;">);
    }

    </span><span style="color: #0000ff;">int</span> flags = fcntl(pipe_[<span style="color: #800080;">0</span><span style="color: #000000;">], F_GETFL, NULL);
    flags </span>|=<span style="color: #000000;"> O_NONBLOCK;
    fcntl(pipe_[</span><span style="color: #800080;">0</span>], F_SETFL, flags);  <span style="color: #008000;">//</span><span style="color: #008000;">无阻塞管道</span>
<span style="color: #000000;">
    VERIFY((th_ </span>= method_thread(<span style="color: #0000ff;">this</span>, <span style="color: #0000ff;">false</span>, &tcpsconn::accept_conn)) != <span style="color: #800080;">0</span><span style="color: #000000;">); 
}</span>
코드 보기

이 생성자는 주로 서버 측 연결을 초기화한 후 클라이언트 연결을 기다리는 스레드를 생성합니다. 나중에 클라이언트 연결을 처리할 때 연결된 클라이언트 소켓이 conns_의 맵, 즉 소켓에 추가됩니다. 연결 포인터 사이의 해당 관계를 수신한 다음 conns_를 통과하여 죽은 연결을 지워서 적시에 죽은 연결을 처리하는 효과를 얻습니다.

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.