c語言Raft實作(附程式碼)
##1. 簡介
本文介紹一個簡單的Raft實作。如果有看過Raft論文,那麼看這篇Raft實作會覺得比較輕鬆,因為Raft論文中把實現的細節描述的非常詳細,工程實作基本上就是將Raft論文中的描述用程式語言重新表達一遍。這就是Raft相對於Paxos最大的優點,即容易看懂且容易實現。本文介紹的Raft實作是用C語言碼成的,除了日誌壓縮功能沒有實現,其它特性都有實現,成員變更機制也做的比較簡單,一次只支援一條配置更改。關於Raft的原理可以看Raft論文和《Raft理解》。2.Raft基本概念
2.1 狀態
raft有三種狀態:Leader,Candidate和Follower。這三種狀態的轉換如下圖所示。只有Leader具有處理客戶請求和向Follower複製日誌的權利。 Candidate是一種Follower轉換到Leader的中間狀態,當集群中沒有Leader的時候,Follower進入Candidate狀態,並向集群中發起投票,獲取到大多數投票的Follower會變成Leader。2.2 訊息
Raft為了提高協定的可理解性,訊息類型的設定及其精簡,只有下面兩種請求。 [indent]
requestVote 發起投票請求。 Candidate發起投票時的請求。由集群中其它Follower和Candidate接收處理。
appendEntries 新增日誌請求。 Leader為Follower新增日誌時所發出的請求。 [/indent]
2.3 任期號
Raft協議中使用任期號term來表示時間的新舊關係,這個term值在每個Leader的任期內是不變的,在不同Leader的中是絕對不同且隨時間單調遞增的。如果一條請求A的term比另一個請求B大,那麼表示請求B是過時的。3.Raft實作
3.1 協定
先介紹四個重要資料結構,對應上面提到過的requestVote和appendEntries請求和回覆。/** requestVote 请求投票 * 竞选者Candidate去竞选Leader时发送给其它node的投票请求。 * 其它Leader或者Candidate收到term比自己大的投票请求时,会自动变成Follower*/ typedef struct { /** 当前任期号,通过任期号的大小与其它Candidate竞争Leader */ int term; /** 竞选者的id */ int candidate_id; /** 竞选者本地保存的最新一条日志的index */ int last_log_idx; /** 竞选者本地保存的最新一条日志的任期号*/ int last_log_term; } msg_requestvote_t; /** 投票请求的回复response. * 该response主要是给返回某个node是否接收了Candidate的投票请求. */ typedef struct { /** node的任期号,Candidate根据投票结果和node的任期号来更新自己的任期号 */ int term; /** 投票结果,如果node给Candidate投票则为true */ int vote_granted; } msg_requestvote_response_t; /** 添加日志请求. * Follower可以从该消息中知道哪些日志可以安全地提交到状态机FSM中去。 * Leader可以将该消息作为心跳消息定期发送。 * 旧的Leader和Candidate收到该消息后可能会自动变成Follower */ typedef struct { /** Leader当前的任期号 */ int term; /** 最新日志的前一条日志的index,用于Follower确认与Leader的日志完全一致 */ int prev_log_idx; /** 最新日志的前一条日志的任期号term */ int prev_log_term; /** leader当前已经确认提交到状态机FSM的日志索引index,这意味着Follower也可以安全地将该索引index以前的日志提交 */ int leader_commit; /** 这条添加日志消息携带的日志条数,该实现中最多只有一条 */ int n_entries; /** 这条添加日志消息中携带的日志数组 */ msg_entry_t* entries; } msg_appendentries_t; /** 添加日志回复. * 旧的Leader或Candidate收到该消息会变成Follower */ typedef struct { /** 当前任期号 */ int term; /** node成功添加日志时返回ture,即prev_log_index和prev_log_term都比对成功。否则返回false */ int success; /* 下面两个字段不是Raft论文中规定的字段: /* 用来优化日志追加过程,以加速日志的追加。Raft原文中的追加过程是一次只能追加一条日志*/ /** 处理添加日志请求后本地的最大日志索引 */ int current_idx; /** 从添加日志请求中接受的第一条日志索引 */ int first_idx; } msg_appendentries_response_t;
3.2 兩個重要的抽象
raft_server_private_t 這個結構體是Raft在實作中的抽象體,保存了Raft協定運作過程中狀態和所需的所有資料。
typedef struct { /* 所有服务器比较固定的状态: */ /* 服务器最后一次知道的任期号(初始化为 0,持续递增) */ int current_term; /* 记录在当前分期内给哪个Candidate投过票, */ int voted_for; /* 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 */ void* log; /* 变动比较频繁的变量: */ /* 已知的最大的已经被提交的日志条目的索引值 */ int commit_idx; /* 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) */ int last_applied_idx; /* 三种状态:follower/leader/candidate */ int state; /* 计时器,周期函数每次执行时会递增改值 */ int timeout_elapsed; raft_node_t* nodes; int num_nodes; int election_timeout; int request_timeout; /* 保存Leader的信息,没有Leader时为NULL */ raft_node_t* current_leader; /* callbacks,由调用该raft实现的调用者来实现,网络IO和持久存储 * 都由调用者在callback中实现 */ raft_cbs_t cb; void* udata; /* 自己的信息 */ raft_node_t* node; /* 该raft实现每次只进行一个服务器的配置更改,该变量记录raft server * 是否正在进行配置更改*/ int voting_cfg_change_log_idx; } raft_server_private_t;
raft_node_private_t 叢集中機器節點的抽象體,包含了raft協定運作過程中需要保存的其它機器上的信息
typedef struct { void* udata; /*一般保存与其它机器的连接信息,由使用者决定怎么实现连接*/ int next_idx; /*对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)*/ int match_idx; /*对于每一个服务器,已经复制给他的日志的最高索引值*/ int flags; /*有三种取值,是相或的关系 1:该机器有给我投票 2:该机器有投票权 3: 该机器有最新的日志*/ int id; /*机器对应的id值,这个每台机器在全局都是唯一的*/ } raft_node_private_t;
3.3 Raft協定過程
週期函數 Raft需要週期性地做一些事情,例如Leader需要周期性地給其它伺服器append日誌,以讓日誌落後的伺服器有機會追上來;所有伺服器需要周期性地將已經確認提交的日誌應用到狀態機中去等等。 raft_periodic函數是該raft實作中被週期性呼叫的函數,呼叫週期是1000ms。機器在不同狀態下會在這個函數中做不同的事情。 Leader週期性地向Follower同步日誌。而Follower週期性地檢測是否在特定的時間內沒有收到過來自Leader的心跳包,如果是的話就變成Candidate開始發起投票競選Leader。不管是Leader還是Follower,都會週期性地將已經提交的日誌commit到狀態機FSM中去。
/** raft周期性执行的函数,实现raft中的定时器以及定期应用日志到状态机 */ int raft_periodic(raft_server_t* me_, int msec_since_last_period) { raft_server_private_t* me = (raft_server_private_t*)me_; /* 选举计时器;Follower每次收到Leader的心跳后会重置清0,Leader每次发送日志也会清0 */ me->timeout_elapsed += msec_since_last_period; /* Leader周期性地向Follower同步日志 */ if (me->state == RAFT_STATE_LEADER) { if (me->request_timeout <= me->timeout_elapsed) raft_send_appendentries_all(me_); } /* Follower检测选举计时器是否超时 */ else if (me->election_timeout <= me->timeout_elapsed) { if (1 < me->num_nodes) raft_election_start(me_); } /* 周期性地将已经确认commit的日志应用到状态机FSM */ if (me->last_applied_idx < me->commit_idx) if (-1 == raft_apply_entry(me_)) return -1; return 0; }
成為競選者Candidate 叢集中每個伺服器都有競選計時器,當一個伺服器在計時器逾時時間內都沒有收到來自Leader的心跳,則認為集群中不存在Leader或Leader掛了,該伺服器就會變成Candidate,進而發起投票去競選Leader,下面raft_become_candidate函數就是伺服器變成Candidate的函數,函數主要做這幾件事:
/** Follower成为Candidate执行的函数 */ void raft_become_candidate(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; int i; /*自增当前的任期号;给自己投票,设置自己的状态为CANDIDATE*/ raft_set_current_term(me_, raft_get_current_term(me_) + 1); for (i = 0; i < me->num_nodes; i++) raft_node_vote_for_me(me->nodes[i], 0); raft_vote(me_, me->node); me->current_leader = NULL; raft_set_state(me_, RAFT_STATE_CANDIDATE); /* 重置选举超时计时器。为了防止多个Candidate竞争,将下一次发起投票的时间间隔设置成随机值*/ /* TODO: this should probably be lower */ me->timeout_elapsed = rand() % me->election_timeout; /*发送请求投票的 RPC 给其他所有服务器*/ for (i = 0; i < me->num_nodes; i++) if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) raft_send_requestvote(me_, me->nodes[i]); }
處理投票請求 處理投票請求的邏輯主要就是判斷是否要同意投票,判斷的依據就是請求中的任期號碼和日誌資訊的新舊程度,還有就是自己是否給其它相同任期號的服務器投過票,如果投過就不能再投,每人只有一票投票權。
如果term > currentTerm, 則轉換為Follower模式。這裡收到投票請求的伺服器有可能是一個網路狀況不佳的Leader或是一個還沒來得及發出投票請求的Candidate,他們收到任期號比自己要新的請求後,都要無條件變成Follower,以保證只有一個Leader存在
如果term daab63a7de571b85be82bb1122a6d0ecterm即为过时)
follower是否成功添加日志,如果添加失败,则减小发给follower的日志索引nextIndex再重试;如果添加成功则更新本地记录的follower日志信息,并检查日志是否最新,如果不是最新则继续发送添加日志请求。
新机器的日志添加,详见3.4节-- 成员变更
/** 处理添加日志请求回复 * / int raft_recv_appendentries_response(raft_server_t* me_, raft_node_t* node, msg_appendentries_response_t* r) { raft_server_private_t* me = (raft_server_private_t*)me_; __log(me_, node, "received appendentries response %s ci:%d rci:%d 1stidx:%d", r->success == 1 ? "SUCCESS" : "fail", raft_get_current_idx(me_), r->current_idx, r->first_idx); /* 过时的回复 -- 忽略 */ if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node)) return 0; /* oh~我不是Leader */ if (!raft_is_leader(me_)) return -1; /* 回复中的term比自己的要大,说明自己是一个过时的Leader,无条件转为Follower */ if (me->current_term < r->term) { raft_set_current_term(me_, r->term); raft_become_follower(me_); return 0; } /* 过时的回复,网络状况不好时会出现 */ else if (me->current_term != r->term) return 0; /* stop processing, this is a node we don't have in our configuration */ if (!node) return 0; /* 由于日志不一致导致添加日志不成功*/ if (0 == r->success) { assert(0 <= raft_node_get_next_idx(node)); /* 将nextIdex减*/ int next_idx = raft_node_get_next_idx(node); assert(0 <= next_idx); /* Follower的日志数量还远远少于Leader,将nextIdex设为回复中的current_idx+1和Leader * 当前索引中较小的一个,一般回复中的current_idx+1会比较小*/ if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); /* Follower的日志数量和Leader差不多,但是比对前一条日志时失败,这种情况将next_idx减1 * 重试*/ else raft_node_set_next_idx(node, next_idx - 1); /* 使用更新后的nextIdx重新发送添加日志请求 */ raft_send_appendentries(me_, node); return 0; } assert(r->current_idx <= raft_get_current_idx(me_)); /* 下面处理添加日志请求的情况 */ /* 更新本地记录的Follower的日志情况 */ raft_node_set_next_idx(node, r->current_idx + 1); raft_node_set_match_idx(node, r->current_idx); /* 如果是新加入的机器,则判断它的日志是否是最新,如果达到了最新,则赋予它投票权, * 这里逻辑的详细解释在第3.4节 -- 成员变更*/ if (!raft_node_is_voting(node) && -1 == me->voting_cfg_change_log_idx && raft_get_current_idx(me_) <= r->current_idx + 1 && me->cb.node_has_sufficient_logs && 0 == raft_node_has_sufficient_logs(node) ) { raft_node_set_has_sufficient_logs(node); me->cb.node_has_sufficient_logs(me_, me->udata, node); } /* 如果一条日志回复成功的数量超过一半,则将日志提交commit,即允许应用到状态机 */ int votes = 1; /* include me */ int point = r->current_idx; int i; for (i = 0; i < me->num_nodes; i++) { if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i])) continue; int match_idx = raft_node_get_match_idx(me->nodes[i]); if (0 < match_idx) { raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx); /*如果follower已经添加了索引大于等于r->current_idx的日志,则vote加1*/ if (ety->term == me->current_term && point <= match_idx) votes++; } } /* 投票数大于所有服务器的一半,则将日志提交 */ if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point) raft_set_commit_idx(me_, point); /* 如果follower的日志还没有最新,那么继续发送添加日志请求 */ if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node))) raft_send_appendentries(me_, node); /* periodic applies committed entries lazily */ return 0; }
3.3 成员变更
成员的变更都是以日志的形式下发的。添加的新成员分两阶段进行,第一阶段中新成员没有有投票权,但是有接收日志的权力;当它的日志同步到最新后就进入到第二阶段,由Leader赋予投票权,从而成为集群中完整的一员。删除成员相对比较简单,所有服务器收到删除成员的日志后,立马将该成员的信息从本地抹除。
添加成员过程
管理员向Leader发送添加成员命令
Leader添加一条 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加没有投票权的服务器。该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息。
当新成员的日志同步到最新后,Leader添加一条 RAFT_LOGTYPE_ADD_NODE日志,即有投票权的服务器,同样地,该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息,以后的投票活动会将新成员考虑进去。
删除成员过程
管理员向Leader发送删除成员命令。
Leader添加一条 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一样同步给其它服务器。收到该日志的服务器立即将被成员信息从本地删除。
感谢大家的阅读,希望大家收益多多。
推荐教程:《C语言》
以上是c語言Raft實作(附程式碼)的詳細內容。更多資訊請關注PHP中文網其他相關文章!