Rumah >pangkalan data >Redis >Satu artikel untuk memahami pemprosesan acara analisis reka bentuk kod sumber Redis

Satu artikel untuk memahami pemprosesan acara analisis reka bentuk kod sumber Redis

WBOY
WBOYke hadapan
2022-10-03 09:00:282501semak imbas

Artikel ini membawakan kepada anda isu berkaitan tentang redis terutamanya tentang kandungan yang berkaitan tentang contoh pemprosesan acara, termasuk pengenalan acara redis, abstraksi acara dan pelaksanaan acara.

Satu artikel untuk memahami pemprosesan acara analisis reka bentuk kod sumber Redis

Pembelajaran yang disyorkan: Tutorial video Redis

1 Pengenalan kepada acara Redis

Pelayan Redis ialah事件驱动程序, yang dipanggil didorong oleh peristiwa adalah untuk memasukkan perintah dan tekan Enter, dan kemudian mesej itu dihimpunkan ke dalam format protokol Redis dan dihantar ke pelayan Redis Pada masa ini, acara akan dijana , dan pelayan Redis akan menerima arahan yang diubah Proses arahan dan hantar balasan, dan apabila kami tidak berinteraksi dengan pelayan, pelayan akan berada dalam keadaan menunggu yang menyekat, ia akan menyerahkan CPU dan kemudian pergi. untuk tidur, dan apabila acara dicetuskan, ia akan dibangunkan oleh sistem pengendalian

Pelayan Redis perlu mengendalikan dua jenis acara berikut:

文件事件: The. Pelayan Redis menyambung kepada klien (atau pelayan Redis lain) melalui soket, dan peristiwa fail adalah tindak balas pelayan kepada soket Abstraksi operasi Komunikasi antara pelayan dan klien (atau pelayan lain) akan menjana peristiwa fail yang sepadan. dan pelayan akan melengkapkan satu siri operasi komunikasi rangkaian dengan mendengar dan memproses acara ini

时间事件: Redis Beberapa operasi dalam pelayan (seperti fungsi serverCron) perlu dilaksanakan pada masa tertentu. , dan peristiwa masa adalah abstraksi pelayan bagi operasi pemasaan tersebut.

2 Abstraksi peristiwa

Redis Abstract 文件事件 dan 时间事件 masing-masing ke dalam struktur data untuk pengurusan

2.1 Struktur acara fail

typedef struct aeFileEvent {
    // 文件时间类型:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask;
    // 可读处理函数
    aeFileProc *rfileProc;
    // 可写处理函数
    aeFileProc *wfileProc;
    // 客户端传入的数据
    void *clientData;
} aeFileEvent;  //文件事件
di mana ahli

dan rfileProc masing-masing adalah dua fungsi Penunjuk, prototaipnya ialah: wfileProc

typedef void aeFileProc(struct aeEventLoop *eventLoop, int fd, void *clientData, int mask);
Fungsinya ialah

. Jika jenis acara yang ditentukan oleh peristiwa fail semasa berlaku, 回调函数 yang sepadan akan dipanggil untuk mengendalikan acara.回调函数

Apabila acara sudah sedia, kita perlu mengetahui deskriptor fail dan jenis acara acara fail untuk mengunci acara, jadi struktur

ditakrifkan untuk pengurusan bersatu: aeFiredEvent

typedef struct aeFiredEvent {
    // 就绪事件的文件描述符
    int fd;
    // 就绪事件类型:AE_NONE,AE_READABLE,AE_WRITABLE
    int mask;
} aeFiredEvent; //就绪事件
Jenis acara fail:

#define AE_NONE 0           //未设置
#define AE_READABLE 1       //事件可读
#define AE_WRITABLE 2       //事件可写
2.2 Struktur acara masa

typedef struct aeTimeEvent {
    // 时间事件的id
    long long id;
    // 时间事件到达的时间的秒数
    long when_sec; /* seconds */
    // 时间事件到达的时间的毫秒数
    long when_ms; /* milliseconds */
    // 时间事件处理函数
    aeTimeProc *timeProc;
    // 时间事件终结函数
    aeEventFinalizerProc *finalizerProc;
    // 客户端传入的数据
    void *clientData;
    // 指向下一个时间事件
    struct aeTimeEvent *next;
} aeTimeEvent;  //时间事件
Dapat dilihat bahawa struktur peristiwa masa ialah nod senarai terpaut, kerana

menunjuk ke Penunjuk acara masa seterusnya.struct aeTimeEvent *next

adalah sama dengan acara fail . Apabila peristiwa yang ditentukan oleh peristiwa masa berlaku,

yang sepadan juga akan dipanggil Ahli struktur 回调函数 dan timeProc kedua-duanya adalah fungsi panggil balik: finalizerProc

typedef int aeTimeProc(struct aeEventLoop *eventLoop, long long id, void *clientData);
typedef void aeEventFinalizerProc(struct aeEventLoop *eventLoop, void *clientData);
Walaupun acara fail dan peristiwa masa telah disarikan,

masih perlu membuat abstraksi keseluruhan acara untuk menerangkan status sesuatu peristiwa, iaitu, struktur acara akan diperkenalkan di bawah.RedisaeEventLoop2.3 Struktur status acara

typedef struct aeEventLoop {
    // 当前已注册的最大的文件描述符
    int maxfd;   /* highest file descriptor currently registered */
    // 文件描述符监听集合的大小
    int setsize; /* max number of file descriptors tracked */
    // 下一个时间事件的ID
    long long timeEventNextId;
    // 最后一次执行事件的时间
    time_t lastTime;     /* Used to detect system clock skew */
    // 注册的文件事件表
    aeFileEvent *events; /* Registered events */
    // 已就绪的文件事件表
    aeFiredEvent *fired; /* Fired events */
    // 时间事件的头节点指针
    aeTimeEvent *timeEventHead;
    // 事件处理开关
    int stop;
    // 多路复用库的事件状态数据
    void *apidata; /* This is used for polling API specific data */
    // 执行处理事件之前的函数
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;  //事件轮询的状态结构
struktur menyimpan penunjuk universal

jenis aeEventLoop, digunakan untuk menyimpan status acara pengundian, iaitu untuk menyimpan status acara perpustakaan pemultipleksan yang dipanggil oleh lapisan asas Semua fungsi pemultipleks void * apidata

dibalut oleh Redis, I/O, select dan epoll biasa. evport Pustaka fungsi pemultipleksan setiap pustaka fungsi pemultipleksan I/O sepadan dengan fail yang berasingan dalam kod sumber Redis, seperti kqueue , I/O dan sebagainya. Semasa fasa penyusunan, mereka akan memilih perpustakaan pemultipleksan prestasi tertinggi mengikut sistem yang berbeza sebagai pelaksanaan program pemultipleksan ae_select.c, dan API semua pustaka Ia adalah sama, yang membolehkan lapisan bawah program pemultipleksan Redis boleh ditukar ganti ae_epoll.cBerikut ialah kod sumber untuk perpustakaan pemilihan tertentu:

RedisAnda juga boleh menggunakan arahan

untuk menyemak perpustakaan pemultipleksan yang sedang digunakan:

// IO复用的选择,性能依次下降,Linux支持 "ae_epoll.c" 和 "ae_select.c"
#ifdef HAVE_EVPORT
#include "ae_evport.c"
#else
    #ifdef HAVE_EPOLL
    #include "ae_epoll.c"
    #else
        #ifdef HAVE_KQUEUE
        #include "ae_kqueue.c"
        #else
        #include "ae_select.c"
        #endif
    #endif
#endif

INFO server Anda boleh melihat bahawa pustaka pemultipleksan lalai di bawah Linux ialah pustaka pemultipleksan

, kemudian

Apa yang disimpan ialah struktur status peristiwa bagi model , yang terdapat dalam

fail sumber:

epollapidataepoll Jenis acara epoll ditakrifkan dalam struktur ae_epoll.c model

, seperti
typedef struct aeApiState {
    // epoll事件的文件描述符
    int epfd;
    // 事件表
    struct epoll_event *events;
} aeApiState;   // 事件的状态
,

, dsb., tetapi struktur fail Redis epoll juga mentakrifkan jenis acaranya sendiri dalam struct epoll_event, seperti: EPOLLIN, EPOLLOUT, dsb., jadi lapisan tengah perlu dilaksanakan Memautkan dua jenis acara, ini adalah API yang sama dilaksanakan dalam fail aeFileEvent yang dinyatakan sebelum ini: mask

// 创建一个epoll实例,保存到eventLoop中
static int aeApiCreate(aeEventLoop *eventLoop)
// 调整事件表的大小
static int aeApiResize(aeEventLoop *eventLoop, int setsize)  
// 释放epoll实例和事件表空间
static void aeApiFree(aeEventLoop *eventLoop)
// 在epfd标识的事件表上注册fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask)
// 在epfd标识的事件表上注删除fd的事件
static void aeApiDelEvent(aeEventLoop *eventLoop, int fd, int delmask)
// 等待所监听文件描述符上有事件发生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp)
// 返回正在使用的IO多路复用库的名字
static char *aeApiName(void)

这些API会讲epoll的底层函数封装起来,Redis实现事件时,只需要调用这些接口即可.

我们以下面两个API的源码举例:

aeApiAddEvent

该函数会向Redis事件状态结构aeEventLoop的事件表event注册一个事件,对应的是epoll_ctl函数.

// 在epfd标识的事件表上注册fd的事件
static int aeApiAddEvent(aeEventLoop *eventLoop, int fd, int mask) {
    aeApiState *state = eventLoop->apidata;
    struct epoll_event ee = {0};
    // EPOLL_CTL_ADD,向epfd注册fd的上的event
    // EPOLL_CTL_MOD,修改fd已注册的event
    // #define AE_NONE 0           //未设置
    // #define AE_READABLE 1       //事件可读
    // #define AE_WRITABLE 2       //事件可写
    // 判断fd事件的操作,如果没有设置事件,则进行关联mask类型事件,否则进行修改
    int op = eventLoop->events[fd].mask == AE_NONE ?
            EPOLL_CTL_ADD : EPOLL_CTL_MOD;
    // struct epoll_event {
    //      uint32_t     events;      /* Epoll events */
    //      epoll_data_t data;        /* User data variable */
    // };
    ee.events = 0;
    // 如果是修改事件,合并之前的事件类型
    mask |= eventLoop->events[fd].mask; /* Merge old events */
    // 根据mask映射epoll的事件类型
    if (mask & AE_READABLE) ee.events |= EPOLLIN;   //读事件
    if (mask & AE_WRITABLE) ee.events |= EPOLLOUT;  //写事件
    ee.data.fd = fd;    //设置事件所从属的目标文件描述符
    // 将ee事件注册到epoll中
    if (epoll_ctl(state->epfd,op,fd,&ee) == -1) return -1;
    return 0;
}

aeApiPoll

等待所监听文件描述符上有事件发生,对应着底层的epoll_wait函数.

// 等待所监听文件描述符上有事件发生
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;
    // 监听事件表上是否有事件发生
    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    // 至少有一个就绪的事件
    if (retval > 0) {
        int j;
        numevents = retval;
        // 遍历就绪的事件表,将其加入到eventLoop的就绪事件表中
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;
            // 根据就绪的事件类型,设置mask
            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            // 添加到就绪事件表中
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    // 返回就绪的事件个数
    return numevents;
}

3. 事件的实现

事件的所有源码都定义在ae.c源文件中,先从aeMain函数说起.

// 事件轮询的主函数
void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    // 一直处理事件
    while (!eventLoop->stop) {
        // 执行处理事件之前的函数
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        //处理到时的时间事件和就绪的文件事件
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

可以看到,如果服务器一直处理事件,那么就是一个死循环,而一个最典型的事件驱动,就是一个死循环. 在循环中,程序会调用处理事件的函数aeProcessEvents(),它的参数是一个事件状态结构aeEventLoopAE_ALL_EVENTS.

事件类型的宏定义,在ae.h头文件中:

#define AE_FILE_EVENTS 1                                //文件事件
#define AE_TIME_EVENTS 2                                //时间事件
#define AE_ALL_EVENTS (AE_FILE_EVENTS|AE_TIME_EVENTS)   //文件和时间事件
#define AE_DONT_WAIT 4
// 处理到时的时间事件和就绪的文件事件
// 如果flags = 0,函数什么都不做,直接返回
// 如果flags设置了 AE_ALL_EVENTS ,则执行所有类型的事件
// 如果flags设置了 AE_FILE_EVENTS ,则执行文件事件
// 如果flags设置了 AE_TIME_EVENTS ,则执行时间事件
// 如果flags设置了 AE_DONT_WAIT ,那么函数处理完事件后直接返回,不阻塞等待
// 函数返回执行的事件个数
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
{
    int processed = 0, numevents;
    // 如果什么事件都没有设置则直接返回
    if (!(flags & AE_TIME_EVENTS) && !(flags & AE_FILE_EVENTS)) return 0;
    // 请注意,既然我们要处理时间事件,即使没有要处理的文件事件,我们仍要调用select(),以便在下一次事件准备启动之前进行休眠
    // 当前还没有要处理的文件事件,或者设置了时间事件但是没有设置不阻塞标识
    if (eventLoop->maxfd != -1 ||
        ((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
        int j;
        aeTimeEvent *shortest = NULL;
        struct timeval tv, *tvp;
        // 如果设置了时间事件而没有设置不阻塞标识
        if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
            // 获取最近到时的时间事件
            shortest = aeSearchNearestTimer(eventLoop);
        // 获取到了最早到时的时间事件
        if (shortest) {
            long now_sec, now_ms;
            // 获取当前时间
            aeGetTime(&now_sec, &now_ms);
            tvp = &tv;
            // 等待该时间事件到时所需要的时长
            long long ms =
                (shortest->when_sec - now_sec)*1000 +
                shortest->when_ms - now_ms;
            // 如果没到时
            if (ms > 0) {
                // 保存时长到tvp中
                tvp->tv_sec = ms/1000;
                tvp->tv_usec = (ms % 1000)*1000;
            // 如果已经到时,则将tvp的时间设置为0
            } else {
                tvp->tv_sec = 0;
                tvp->tv_usec = 0;
            }
        // 没有获取到了最早到时的时间事件,时间事件链表为空
        } else {
            // 如果设置了不阻塞标识
            if (flags & AE_DONT_WAIT) {
                // 将tvp的时间设置为0,就不会阻塞
                tv.tv_sec = tv.tv_usec = 0;
                tvp = &tv;
            } else {
                // 阻塞到第一个时间事件的到来
                /* Otherwise we can block */
                tvp = NULL; /* wait forever */
            }
        }
        // 等待所监听文件描述符上有事件发生
        // 如果tvp为NULL,则阻塞在此,否则等待tvp设置阻塞的时间,就会有时间事件到时
        // 返回了就绪文件事件的个数
        numevents = aeApiPoll(eventLoop, tvp);
        // 遍历就绪文件事件表
        for (j = 0; j < numevents; j++) {
            // 获取就绪文件事件的地址
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            // 获取就绪文件事件的类型,文件描述符
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int rfired = 0;
            // 如果是文件可读事件发生
            if (fe->mask & mask & AE_READABLE) {
                // 设置读事件标识 且 调用读事件方法处理读事件
                rfired = 1;
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
            }
            // 如果是文件可写事件发生
            if (fe->mask & mask & AE_WRITABLE) {
                // 读写事件的执行发法不同,则执行写事件,避免重复执行相同的方法
                if (!rfired || fe->wfileProc != fe->rfileProc)
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
            }
            processed++;    //执行的事件次数加1
        }
    }
    /* Check time events */
    // 执行时间事件
    if (flags & AE_TIME_EVENTS)
        processed += processTimeEvents(eventLoop);
    return processed; /* return the number of processed file/time events */
}

Redis服务器在没有被事件触发时,如果没有设置AE_DONT_WAIT标识,就会开始阻塞等待. 但是它不会死等待,因为还需要处理时间事件,所以在调用aeApiPoll进行监听之前,会先从时间事件表中获取一个最近到达的时间,根据需要等待的时间构建一个struct timeval tv, *tvp结构的变量,这个变量保存着服务器阻塞等待文件事件的最长时间,一旦时间到达而没有触发文件事件aeApiPoll函数就会停止阻塞,进而调用processTimeEvents函数处理时间事件.

如果在阻塞等待的最长时间之间,触发了文件事件,就会先执行文件事件,后执行时间事件,因此处理时间事件通常比预设的会晚一点.

而执行文件事件rfileProcwfileProc也是调用了回调函数,Redis将文件事件的处理分为了好几种,用于处理不同的网络通信需求:

  • acceptTcpHandler:用于accept client的connect.
  • acceptUnixHandler:用于accept client的本地connect.
  • sendReplyToClient:用于向client发送命令回复.
  • readQueryFromClient:用于读入client发送的请求.

然后我们来看一下获取最快达到时间事件的函数aeSearchNearestTimer实现:

// 寻找第一个快到时的时间事件
// 这个操作是有用的知道有多少时间可以选择该事件设置为不用推迟任何事件的睡眠中。
// 如果事件链表没有时间将返回NULL。
static aeTimeEvent *aeSearchNearestTimer(aeEventLoop *eventLoop)
{
    // 时间事件头节点地址
    aeTimeEvent *te = eventLoop->timeEventHead;
    aeTimeEvent *nearest = NULL;
    // 遍历所有的时间事件
    while(te) {
        // 寻找第一个快到时的时间事件,保存到nearest中
        if (!nearest || te->when_sec < nearest->when_sec ||
                (te->when_sec == nearest->when_sec &&
                 te->when_ms < nearest->when_ms))
            nearest = te;
        te = te->next;
    }
    return nearest;
}

该函数就是遍历时间事件链表,然后找到最小值.

我们重点看执行时间事件的函数processTimeEvents函数的实现:

// 执行时间事件
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);
    // 这里尝试发现时间混乱的情况,上一次处理事件的时间比当前时间还要大
    // 重置最近一次处理事件的时间
    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    // 设置上一次时间事件处理的时间为当前时间
    eventLoop->lastTime = now;
    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;   //当前时间事件表中的最大ID
    // 遍历时间事件链表
    while(te) {
        long now_sec, now_ms;
        long long id;
        /* Remove events scheduled for deletion. */
        // 如果时间事件已被删除了
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            // 从事件链表中删除事件的节点
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            // 调用时间事件终结方法清除该事件
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }
        // 确保我们不处理在此迭代中由时间事件创建的时间事件. 请注意,此检查目前无效:我们总是在头节点添加新的计时器,但是如果我们更改实施细节,则该检查可能会再次有用:我们将其保留在未来的防御
        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        // 获取当前时间
        aeGetTime(&now_sec, &now_ms);
        // 找到已经到时的时间事件
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;
            id = te->id;
            // 调用时间事件处理方法
            retval = te->timeProc(eventLoop, id, te->clientData);
            // 时间事件次数加1
            processed++;
            // 如果不是定时事件,则继续设置它的到时时间
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            // 如果是定时时间,则retval为-1,则将其时间事件删除,惰性删除
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        // 更新前驱节点指针和后继节点指针
        prev = te;
        te = te->next;
    }
    return processed;   //返回执行事件的次数
}

如果时间事件不存在,则就调用finalizerProc指向的回调函数,删除当前的时间事件. 如果存在,就调用timeProc指向的回调函数处理时间事件. Redis的时间事件分为两类:

  • 定时事件:让一段程序在指定的时间后执行一次.
  • 周期性事件:让一段程序每隔指定的时间后执行一次.

如果当前的时间事件是周期性,那么就会在将时间周期添加到周期事件的到时时间中. 如果是定时事件,则将该时间事件删除.

推荐学习:Redis视频教程

Atas ialah kandungan terperinci Satu artikel untuk memahami pemprosesan acara analisis reka bentuk kod sumber Redis. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Artikel ini dikembalikan pada:jb51.net. Jika ada pelanggaran, sila hubungi admin@php.cn Padam