가장 먼저 할 일은 프로세서를 등록하는 것입니다.
루프 수신 포트를 열면 연결이 모니터링될 때마다 고루틴이 생성됩니다. 루프를 통해 요청 데이터를 수신한 다음 요청된 주소에 따라 프로세서 라우팅 테이블의 해당 프로세서를 일치시킨 다음 처리를 위해 프로세서에 요청을 전달합니다.
코드로 표현하면 다음과 같습니다.
func (srv *Server) Serve(l net.Listener) error { ... baseCtx := context.Background() ctx := context.WithValue(baseCtx, ServerContextKey, srv) for { // 接收 listener 过来的网络连接 rw, err := l.Accept() ... tempDelay = 0 c := srv.newConn(rw) c.setState(c.rwc, StateNew) // 创建协程处理连接 go c.serve(connCtx) } }
예: accept는 acceptTCPHandler 이벤트 핸들러에 해당하고, 읽기 및 쓰기는 readQueryFromClient 이벤트 핸들러에 해당하며, 그런 다음 이벤트는 이벤트 루프 디스패치를 통해 처리하기 위해 이벤트 프로세서에 할당됩니다.
그래서 위의 Reactor 모드는 epoll을 통해 구현됩니다. epoll의 경우 크게 세 가지 방법이 있습니다.
//创建一个epoll的句柄,size用来告诉内核这个监听的数目一共有多大 int epoll_create(int size); /* * 可以理解为,增删改 fd 需要监听的事件 * epfd 是 epoll_create() 创建的句柄。 * op 表示 增删改 * epoll_event 表示需要监听的事件,Redis 只用到了可读,可写,错误,挂断 四个状态 */ int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); /* * 可以理解为查询符合条件的事件 * epfd 是 epoll_create() 创建的句柄。 * epoll_event 用来存放从内核得到事件的集合 * maxevents 获取的最大事件数 * timeout 等待超时时间 */ int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);
그래서 이 세 가지 방법을 기반으로 간단한 서버를 구현할 수 있습니다.
// 创建监听 int listenfd = ::socket(); // 绑定ip和端口 int r = ::bind(); // 创建 epoll 实例 int epollfd = epoll_create(xxx); // 添加epoll要监听的事件类型 int r = epoll_ctl(..., listenfd, ...); struct epoll_event* alive_events = static_cast<epoll_event*>(calloc(kMaxEvents, sizeof(epoll_event))); while (true) { // 等待事件 int num = epoll_wait(epollfd, alive_events, kMaxEvents, kEpollWaitTime); // 遍历事件,并进行事件处理 for (int i = 0; i < num; ++i) { int fd = alive_events[i].data.fd; // 获取事件 int events = alive_events[i].events; // 进行事件的分发 if ( (events & EPOLLERR) || (events & EPOLLHUP) ) { ... } else if (events & EPOLLRDHUP) { ... } ... } }
호출 프로세스 #
그래서 위의 방법을 기반으로 합니다. 소개에서 Redis의 경우 이벤트 루프는 몇 단계에 불과하다는 것을 알 수 있습니다.
이벤트 모니터링 및 콜백 기능을 등록합니다.
이벤트를 획득하고 처리하기 위해 대기하는 루프
fd를 epoll에 등록하고 콜백 함수 acceptTcpHandler를 설정합니다.
무한 루프를 시작하여 epoll_wait를 호출하고 이벤트를 계속 처리합니다. 나중에 aeProcessEvents 함수를 루프하기 위해 aeMain 함수로 돌아갑니다. acceptTcpHandler는 데이터를 처리합니다. readQueryFromClient는 클라이언트의 데이터를 구문 분석하고 실행할 해당 cmd 함수를 찾습니다.
processInputBuffer 함수는 요청된 명령을 루프에서 처리하고 요청된 프로토콜에 따라 processInlineBuffer 함수를 호출한 다음 processCommand를 호출하여 redisObject 객체 뒤에 명령을 실행합니다.
processCommand는 명령을 실행할 때 전달됩니다.lookupCommand는server.commands
테이블로 이동하여 명령을 기반으로 해당 실행 함수를 찾은 다음 일련의 확인 후 해당 함수를 호출하여 실행합니다. 명령을 실행하고 반환된 데이터를 클라이언트 출력 버퍼에 쓰기 위해 addReply를 호출합니다. Area
server.commands
는 populateCommandTable 함수의 모든 Redis 명령을 기반으로 명령 기능을 얻는 테이블로 등록합니다. 명령 이름.
void getCommand(client *c) { getGenericCommand(c); } int getGenericCommand(client *c) { robj *o; // 查找数据 if ((o = lookupKeyReadOrReply(c,c->argv[1],shared.nullbulk)) == NULL) return C_OK; ... } robj *lookupKeyReadOrReply(client *c, robj *key, robj *reply) { //到db中查找数据 robj *o = lookupKeyRead(c->db, key); // 写入到缓存中 if (!o) addReply(c,reply); return o; }
getCommand 함수에서 데이터를 찾은 다음 addReply를 호출하여 반환된 데이터를 클라이언트 출력 버퍼에 씁니다.
버퍼에 명령을 쓴 후 데이터를 버퍼에서 꺼내 클라이언트로 반환해야 합니다. 클라이언트에 데이터를 다시 쓰는 과정은 실제로 서버의 이벤트 루프에서 완료됩니다.
먼저 Redis는 메인 함수에서 aeSetBeforeSleepProc 함수를 호출하여 쓰기 저장 패키지의 beforeSleep 함수를 eventLoop에 등록합니다.
그런 다음 Redis는 aeMain 함수를 호출할 때 beforesleep이 존재하는지 확인합니다. 이벤트 루프가 설정되어 있으면
beforesleep 함수가 writeToClient를 호출하여 버퍼에서 클라이언트에 데이터를 다시 쓰는 함수를 호출합니다.
위 내용은 Redis 요청 처리 과정은 어떻게 되나요?의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!