C 언어 Raft 구현(코드 포함)
1. 소개
이 기사에서는 간단한 Raft 구현을 소개합니다. Raft 논문을 읽었다면 이 Raft 구현을 더 쉽게 읽을 수 있을 것입니다. 왜냐하면 구현에 대한 세부 사항은 Raft 논문에 아주 자세하게 설명되어 있기 때문입니다. 엔지니어링 구현은 기본적으로 Raft 논문의 설명을 다시 표현하는 것입니다. 프로그래밍 언어로. 이것이 Paxos에 비해 Raft의 가장 큰 장점입니다. 즉, 이해하고 구현하기 쉽다는 것입니다. 이 글에서 소개하는 Raft 구현은 C 언어로 코딩되어 있으며, 로그 압축 기능을 제외하고 다른 기능도 구현되어 있습니다. 멤버 변경 메커니즘도 비교적 간단하며 한 번에 하나의 구성 변경만 지원합니다. Raft의 원리에 대해서는 Raft 논문과 "Raft Understanding"을 읽을 수 있습니다.
2. Raft의 기본 개념
2.1 상태
raft에는 리더, 후보 및 추종자의 세 가지 상태가 있습니다. 이 세 가지 상태 사이의 전환은 아래 그림에 나와 있습니다. 리더만이 고객 요청을 처리하고 로그를 팔로워에게 복사할 수 있는 권한을 갖습니다. 후보자는 클러스터에 리더가 없는 경우 후보자 상태로 들어가 클러스터에 대한 투표를 시작합니다.
2.2 메시지
Raft 프로토콜의 이해도를 높이기 위해 메시지 유형을 설정하고 단순화하였으며, 요청은 다음 두 가지만 있습니다. [indent]
requestVote 투표 요청을 시작합니다. 투표를 시작할 때 후보자의 요청입니다. 클러스터의 다른 추종자와 후보자가 이를 수신하고 처리합니다.
appendEntries 로그 요청을 추가합니다. Follower에 로그를 추가할 때 Leader가 발행한 요청입니다. [/indent]
2.3 용어 번호
Raft 프로토콜은 용어 번호를 사용하여 이전 및 새 시간 관계를 나타냅니다. 이 용어 값은 각 리더의 임기 동안 변경되지 않으며 다른 리더와 완전히 다릅니다. 시간이 지남에 따라 단조롭게 증가합니다. 요청 A의 기간이 요청 B의 기간보다 긴 경우 요청 B는 오래된 것입니다.
3. Raft 구현
3.1 프로토콜
먼저 위에서 언급한 requestVote 및 AppendEntries 요청 및 응답에 해당하는 네 가지 중요한 데이터 구조를 소개합니다.
/** requestVote 请求投票 * 竞选者Candidate去竞选Leader时发送给其它node的投票请求。 * 其它Leader或者Candidate收到term比自己大的投票请求时,会自动变成Follower*/ typedef struct { /** 当前任期号,通过任期号的大小与其它Candidate竞争Leader */ int term; /** 竞选者的id */ int candidate_id; /** 竞选者本地保存的最新一条日志的index */ int last_log_idx; /** 竞选者本地保存的最新一条日志的任期号*/ int last_log_term; } msg_requestvote_t; /** 投票请求的回复response. * 该response主要是给返回某个node是否接收了Candidate的投票请求. */ typedef struct { /** node的任期号,Candidate根据投票结果和node的任期号来更新自己的任期号 */ int term; /** 投票结果,如果node给Candidate投票则为true */ int vote_granted; } msg_requestvote_response_t; /** 添加日志请求. * Follower可以从该消息中知道哪些日志可以安全地提交到状态机FSM中去。 * Leader可以将该消息作为心跳消息定期发送。 * 旧的Leader和Candidate收到该消息后可能会自动变成Follower */ typedef struct { /** Leader当前的任期号 */ int term; /** 最新日志的前一条日志的index,用于Follower确认与Leader的日志完全一致 */ int prev_log_idx; /** 最新日志的前一条日志的任期号term */ int prev_log_term; /** leader当前已经确认提交到状态机FSM的日志索引index,这意味着Follower也可以安全地将该索引index以前的日志提交 */ int leader_commit; /** 这条添加日志消息携带的日志条数,该实现中最多只有一条 */ int n_entries; /** 这条添加日志消息中携带的日志数组 */ msg_entry_t* entries; } msg_appendentries_t; /** 添加日志回复. * 旧的Leader或Candidate收到该消息会变成Follower */ typedef struct { /** 当前任期号 */ int term; /** node成功添加日志时返回ture,即prev_log_index和prev_log_term都比对成功。否则返回false */ int success; /* 下面两个字段不是Raft论文中规定的字段: /* 用来优化日志追加过程,以加速日志的追加。Raft原文中的追加过程是一次只能追加一条日志*/ /** 处理添加日志请求后本地的最大日志索引 */ int current_idx; /** 从添加日志请求中接受的第一条日志索引 */ int first_idx; } msg_appendentries_response_t;
3.2 두 가지 중요한 추상화
raft_server_private_t 이 구조는 Raft 프로토콜 작동 중에 필요한 상태와 모든 데이터를 저장하는 Raft 구현의 추상화입니다.
typedef struct { /* 所有服务器比较固定的状态: */ /* 服务器最后一次知道的任期号(初始化为 0,持续递增) */ int current_term; /* 记录在当前分期内给哪个Candidate投过票, */ int voted_for; /* 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 */ void* log; /* 变动比较频繁的变量: */ /* 已知的最大的已经被提交的日志条目的索引值 */ int commit_idx; /* 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) */ int last_applied_idx; /* 三种状态:follower/leader/candidate */ int state; /* 计时器,周期函数每次执行时会递增改值 */ int timeout_elapsed; raft_node_t* nodes; int num_nodes; int election_timeout; int request_timeout; /* 保存Leader的信息,没有Leader时为NULL */ raft_node_t* current_leader; /* callbacks,由调用该raft实现的调用者来实现,网络IO和持久存储 * 都由调用者在callback中实现 */ raft_cbs_t cb; void* udata; /* 自己的信息 */ raft_node_t* node; /* 该raft实现每次只进行一个服务器的配置更改,该变量记录raft server * 是否正在进行配置更改*/ int voting_cfg_change_log_idx; } raft_server_private_t;
raft_node_private_t raft 프로토콜 실행 중에 저장해야 하는 다른 시스템에 대한 정보를 포함하여 클러스터에 있는 시스템 노드의 추상 본문
typedef struct { void* udata; /*一般保存与其它机器的连接信息,由使用者决定怎么实现连接*/ int next_idx; /*对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)*/ int match_idx; /*对于每一个服务器,已经复制给他的日志的最高索引值*/ int flags; /*有三种取值,是相或的关系 1:该机器有给我投票 2:该机器有投票权 3: 该机器有最新的日志*/ int id; /*机器对应的id值,这个每台机器在全局都是唯一的*/ } raft_node_private_t;
3.3 Raft 프로토콜 프로세스
주기적 기능 Raft 예를 들어, 리더는 지연된 로그가 있는 서버가 따라잡을 수 있는 기회를 제공하기 위해 정기적으로 다른 서버에 로그를 추가해야 하며, 확인되고 제출된 로그를 상태 시스템에 주기적으로 적용해야 합니다.
raft_periodic 함수는 raft 구현에서 주기적으로 호출되는 함수이며, 호출 주기는 1000ms입니다. 기계는 이 기능에서 다양한 상태에서 다양한 작업을 수행합니다. Leader는 주기적으로 Follower에 로그를 동기화합니다. Follower는 일정 시간 내에 Leader로부터 하트비트 패킷을 받지 못했는지 주기적으로 감지하여 Candidate가 되어 Leader에게 투표를 시작합니다. 리더와 팔로어 모두 주기적으로 제출된 로그를 상태 시스템 FSM에 커밋합니다.
/** raft周期性执行的函数,实现raft中的定时器以及定期应用日志到状态机 */ int raft_periodic(raft_server_t* me_, int msec_since_last_period) { raft_server_private_t* me = (raft_server_private_t*)me_; /* 选举计时器;Follower每次收到Leader的心跳后会重置清0,Leader每次发送日志也会清0 */ me->timeout_elapsed += msec_since_last_period; /* Leader周期性地向Follower同步日志 */ if (me->state == RAFT_STATE_LEADER) { if (me->request_timeout <= me->timeout_elapsed) raft_send_appendentries_all(me_); } /* Follower检测选举计时器是否超时 */ else if (me->election_timeout <= me->timeout_elapsed) { if (1 < me->num_nodes) raft_election_start(me_); } /* 周期性地将已经确认commit的日志应用到状态机FSM */ if (me->last_applied_idx < me->commit_idx) if (-1 == raft_apply_entry(me_)) return -1; return 0; }
후보 되기 클러스터의 각 서버에는 선거 타이머가 있습니다. 서버가 타이머 제한 시간 내에 리더로부터 하트비트를 받지 못하면 클러스터에 리더가 없거나 리더가 있는 것으로 간주됩니다. down., 서버는 후보가 된 후 리더에 출마하기 위한 투표를 시작합니다. 다음 raft_become_candidate 함수는 서버가 후보가 되기 위한 함수입니다.
현재 용어를 증가시킵니다. number (currentTerm)
직접 투표하세요
선거 제한 시간 타이머 재설정
다른 모든 서버에 투표를 요청하는 RPC 보내기
/** Follower成为Candidate执行的函数 */ void raft_become_candidate(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; int i; /*自增当前的任期号;给自己投票,设置自己的状态为CANDIDATE*/ raft_set_current_term(me_, raft_get_current_term(me_) + 1); for (i = 0; i < me->num_nodes; i++) raft_node_vote_for_me(me->nodes[i], 0); raft_vote(me_, me->node); me->current_leader = NULL; raft_set_state(me_, RAFT_STATE_CANDIDATE); /* 重置选举超时计时器。为了防止多个Candidate竞争,将下一次发起投票的时间间隔设置成随机值*/ /* TODO: this should probably be lower */ me->timeout_elapsed = rand() % me->election_timeout; /*发送请求投票的 RPC 给其他所有服务器*/ for (i = 0; i < me->num_nodes; i++) if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) raft_send_requestvote(me_, me->nodes[i]); }
투표 요청 처리 투표 요청 처리 논리는 주로 다음과 같습니다. 투표 동의 여부 판단, 판단 근거 요청에 포함된 용어 번호와 로그 정보의 최신성을 의미하며, 동일한 용어 번호로 다른 서버에 투표했는지 여부를 의미하며, 투표를 했다면 투표할 수 없습니다. 다시 한 번 말하지만 각 사람은 한 표만 가지고 있습니다.
term > currentTerm이면 팔로워 모드로 전환하세요.
여기서 투표 요청을 받는 서버는 네트워크 상태가 좋지 않은 리더이거나 아직 투표 요청을 발행할 시간이 없는 후보자일 수 있습니다. 자신보다 새로운 임기로 요청을 받은 후에는 무조건 투표해야 합니다. 하나의 리더만 존재하도록 하기 위해
如果term daab63a7de571b85be82bb1122a6d0ecterm即为过时)
follower是否成功添加日志,如果添加失败,则减小发给follower的日志索引nextIndex再重试;如果添加成功则更新本地记录的follower日志信息,并检查日志是否最新,如果不是最新则继续发送添加日志请求。
新机器的日志添加,详见3.4节-- 成员变更
/** 处理添加日志请求回复 * / int raft_recv_appendentries_response(raft_server_t* me_, raft_node_t* node, msg_appendentries_response_t* r) { raft_server_private_t* me = (raft_server_private_t*)me_; __log(me_, node, "received appendentries response %s ci:%d rci:%d 1stidx:%d", r->success == 1 ? "SUCCESS" : "fail", raft_get_current_idx(me_), r->current_idx, r->first_idx); /* 过时的回复 -- 忽略 */ if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node)) return 0; /* oh~我不是Leader */ if (!raft_is_leader(me_)) return -1; /* 回复中的term比自己的要大,说明自己是一个过时的Leader,无条件转为Follower */ if (me->current_term < r->term) { raft_set_current_term(me_, r->term); raft_become_follower(me_); return 0; } /* 过时的回复,网络状况不好时会出现 */ else if (me->current_term != r->term) return 0; /* stop processing, this is a node we don't have in our configuration */ if (!node) return 0; /* 由于日志不一致导致添加日志不成功*/ if (0 == r->success) { assert(0 <= raft_node_get_next_idx(node)); /* 将nextIdex减*/ int next_idx = raft_node_get_next_idx(node); assert(0 <= next_idx); /* Follower的日志数量还远远少于Leader,将nextIdex设为回复中的current_idx+1和Leader * 当前索引中较小的一个,一般回复中的current_idx+1会比较小*/ if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); /* Follower的日志数量和Leader差不多,但是比对前一条日志时失败,这种情况将next_idx减1 * 重试*/ else raft_node_set_next_idx(node, next_idx - 1); /* 使用更新后的nextIdx重新发送添加日志请求 */ raft_send_appendentries(me_, node); return 0; } assert(r->current_idx <= raft_get_current_idx(me_)); /* 下面处理添加日志请求的情况 */ /* 更新本地记录的Follower的日志情况 */ raft_node_set_next_idx(node, r->current_idx + 1); raft_node_set_match_idx(node, r->current_idx); /* 如果是新加入的机器,则判断它的日志是否是最新,如果达到了最新,则赋予它投票权, * 这里逻辑的详细解释在第3.4节 -- 成员变更*/ if (!raft_node_is_voting(node) && -1 == me->voting_cfg_change_log_idx && raft_get_current_idx(me_) <= r->current_idx + 1 && me->cb.node_has_sufficient_logs && 0 == raft_node_has_sufficient_logs(node) ) { raft_node_set_has_sufficient_logs(node); me->cb.node_has_sufficient_logs(me_, me->udata, node); } /* 如果一条日志回复成功的数量超过一半,则将日志提交commit,即允许应用到状态机 */ int votes = 1; /* include me */ int point = r->current_idx; int i; for (i = 0; i < me->num_nodes; i++) { if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i])) continue; int match_idx = raft_node_get_match_idx(me->nodes[i]); if (0 < match_idx) { raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx); /*如果follower已经添加了索引大于等于r->current_idx的日志,则vote加1*/ if (ety->term == me->current_term && point <= match_idx) votes++; } } /* 投票数大于所有服务器的一半,则将日志提交 */ if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point) raft_set_commit_idx(me_, point); /* 如果follower的日志还没有最新,那么继续发送添加日志请求 */ if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node))) raft_send_appendentries(me_, node); /* periodic applies committed entries lazily */ return 0; }
3.3 成员变更
成员的变更都是以日志的形式下发的。添加的新成员分两阶段进行,第一阶段中新成员没有有投票权,但是有接收日志的权力;当它的日志同步到最新后就进入到第二阶段,由Leader赋予投票权,从而成为集群中完整的一员。删除成员相对比较简单,所有服务器收到删除成员的日志后,立马将该成员的信息从本地抹除。
添加成员过程
管理员向Leader发送添加成员命令
Leader添加一条 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加没有投票权的服务器。该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息。
当新成员的日志同步到最新后,Leader添加一条 RAFT_LOGTYPE_ADD_NODE日志,即有投票权的服务器,同样地,该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息,以后的投票活动会将新成员考虑进去。
删除成员过程
管理员向Leader发送删除成员命令。
Leader添加一条 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一样同步给其它服务器。收到该日志的服务器立即将被成员信息从本地删除。
感谢大家的阅读,希望大家收益多多。
推荐教程:《C语言》
위 내용은 C 언어로 Raft 구현(코드 포함)의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!