C 言語 Raft 実装 (コード付き)
1. はじめに
この記事では、簡単な Raft 実装を紹介します。 Raft 論文を読んでいれば、実装の詳細は Raft 論文に詳しく記載されているため、この Raft 実装は読みやすいと思いますが、エンジニアリング実装は基本的に Raft 論文の記述を再表現することになります。プログラミング言語で。これが Paxos に対する Raft の最大の利点です。つまり、理解と実装が簡単です。この記事で紹介するRaftの実装はC言語で記述されており、ログ圧縮機能以外の機能は実装されており、メンバー変更の仕組みも比較的シンプルで、一度に1つの設定変更のみをサポートしています。 Raft の原則については、Raft の論文と「Raft Understanding」を参照してください。
2. Raft の基本概念
2.1 ステータス
raft には、リーダー、候補、フォロワーの 3 つの状態があります。これら 3 つの状態間の遷移を次の図に示します。リーダーのみが顧客のリクエストを処理し、ログをフォロワーにコピーする権利を持っています。候補はフォロワーとリーダーの間の中間状態です。クラスター内にリーダーがいない場合、フォロワーは候補状態に入り、クラスターへの投票を開始します。過半数の票を獲得したフォロワーがリーダーになります。
2.2 メッセージ
Raft では、プロトコルの分かりやすさを向上させるために、メッセージ タイプの設定を簡略化し、次の 2 つのリクエストのみを備えています。 [インデント]
requestVote 投票リクエストを開始します。投票を開始する際の候補者のリクエスト。これは、クラスター内の他のフォロワーおよび候補によって受信され、処理されます。
appendEntries ログ リクエストを追加します。ログをフォロワーに追加するときにリーダーによって発行されるリクエスト。 [/indent]
2.3 用語番号
用語番号の用語は、時間の新旧の関係を示すために Raft プロトコルで使用されます。各リーダーの任期は不変ですが、リーダーごとに完全に異なり、時間とともに単調に増加します。リクエスト A の期間がリクエスト B の期間より長い場合、リクエスト B は期限切れになります。
3.Raft の実装
3.1 プロトコル
まず、上記のデータ構造に対応する 4 つの重要なデータ構造を紹介します。そして、appendEntries のリクエストと応答。
/** requestVote 请求投票 * 竞选者Candidate去竞选Leader时发送给其它node的投票请求。 * 其它Leader或者Candidate收到term比自己大的投票请求时,会自动变成Follower*/ typedef struct { /** 当前任期号,通过任期号的大小与其它Candidate竞争Leader */ int term; /** 竞选者的id */ int candidate_id; /** 竞选者本地保存的最新一条日志的index */ int last_log_idx; /** 竞选者本地保存的最新一条日志的任期号*/ int last_log_term; } msg_requestvote_t; /** 投票请求的回复response. * 该response主要是给返回某个node是否接收了Candidate的投票请求. */ typedef struct { /** node的任期号,Candidate根据投票结果和node的任期号来更新自己的任期号 */ int term; /** 投票结果,如果node给Candidate投票则为true */ int vote_granted; } msg_requestvote_response_t; /** 添加日志请求. * Follower可以从该消息中知道哪些日志可以安全地提交到状态机FSM中去。 * Leader可以将该消息作为心跳消息定期发送。 * 旧的Leader和Candidate收到该消息后可能会自动变成Follower */ typedef struct { /** Leader当前的任期号 */ int term; /** 最新日志的前一条日志的index,用于Follower确认与Leader的日志完全一致 */ int prev_log_idx; /** 最新日志的前一条日志的任期号term */ int prev_log_term; /** leader当前已经确认提交到状态机FSM的日志索引index,这意味着Follower也可以安全地将该索引index以前的日志提交 */ int leader_commit; /** 这条添加日志消息携带的日志条数,该实现中最多只有一条 */ int n_entries; /** 这条添加日志消息中携带的日志数组 */ msg_entry_t* entries; } msg_appendentries_t; /** 添加日志回复. * 旧的Leader或Candidate收到该消息会变成Follower */ typedef struct { /** 当前任期号 */ int term; /** node成功添加日志时返回ture,即prev_log_index和prev_log_term都比对成功。否则返回false */ int success; /* 下面两个字段不是Raft论文中规定的字段: /* 用来优化日志追加过程,以加速日志的追加。Raft原文中的追加过程是一次只能追加一条日志*/ /** 处理添加日志请求后本地的最大日志索引 */ int current_idx; /** 从添加日志请求中接受的第一条日志索引 */ int first_idx; } msg_appendentries_response_t;
3.2 2 つの重要な抽象化
##raft_server_private_t この構造体は Raft 実装の抽象化であり、ステータスとすべてのデータを保存します。 Raftプロトコルの運用中に必要となります。
typedef struct { /* 所有服务器比较固定的状态: */ /* 服务器最后一次知道的任期号(初始化为 0,持续递增) */ int current_term; /* 记录在当前分期内给哪个Candidate投过票, */ int voted_for; /* 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 */ void* log; /* 变动比较频繁的变量: */ /* 已知的最大的已经被提交的日志条目的索引值 */ int commit_idx; /* 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) */ int last_applied_idx; /* 三种状态:follower/leader/candidate */ int state; /* 计时器,周期函数每次执行时会递增改值 */ int timeout_elapsed; raft_node_t* nodes; int num_nodes; int election_timeout; int request_timeout; /* 保存Leader的信息,没有Leader时为NULL */ raft_node_t* current_leader; /* callbacks,由调用该raft实现的调用者来实现,网络IO和持久存储 * 都由调用者在callback中实现 */ raft_cbs_t cb; void* udata; /* 自己的信息 */ raft_node_t* node; /* 该raft实现每次只进行一个服务器的配置更改,该变量记录raft server * 是否正在进行配置更改*/ int voting_cfg_change_log_idx; } raft_server_private_t;
raft_node_private_t クラスター内のマシン ノードの抽象本体には、raft プロトコルの実行中に保存する必要がある他のマシンに関する情報が含まれています
typedef struct { void* udata; /*一般保存与其它机器的连接信息,由使用者决定怎么实现连接*/ int next_idx; /*对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)*/ int match_idx; /*对于每一个服务器,已经复制给他的日志的最高索引值*/ int flags; /*有三种取值,是相或的关系 1:该机器有给我投票 2:该机器有投票权 3: 该机器有最新的日志*/ int id; /*机器对应的id值,这个每台机器在全局都是唯一的*/ } raft_node_private_t;# # 3.3 Raft プロトコル プロセス
定期関数
Raft はいくつかのことを定期的に実行する必要があります。たとえば、リーダーは定期的にログを他のサーバーに追加する必要があります。追いつくチャンスがある; すべてのサーバーは、確認されたコミット ログをステート マシンなどに定期的に適用する必要があります。 raft_periodic関数はraft実装内で定期的に呼び出される関数であり、呼び出し周期は1000msです。マシンは、この関数でさまざまな状態でさまざまな処理を実行します。リーダーは定期的にログをフォロワーに同期します。 Follower は、一定期間内に Leader からのハートビート パケットを受信していないかどうかを定期的に検出し、受信している場合は Candidate となり、Leader への投票を開始します。リーダーとフォロワーは両方とも、送信されたログをステート マシン FSM に定期的にコミットします。
/** raft周期性执行的函数,实现raft中的定时器以及定期应用日志到状态机 */ int raft_periodic(raft_server_t* me_, int msec_since_last_period) { raft_server_private_t* me = (raft_server_private_t*)me_; /* 选举计时器;Follower每次收到Leader的心跳后会重置清0,Leader每次发送日志也会清0 */ me->timeout_elapsed += msec_since_last_period; /* Leader周期性地向Follower同步日志 */ if (me->state == RAFT_STATE_LEADER) { if (me->request_timeout <= me->timeout_elapsed) raft_send_appendentries_all(me_); } /* Follower检测选举计时器是否超时 */ else if (me->election_timeout <= me->timeout_elapsed) { if (1 < me->num_nodes) raft_election_start(me_); } /* 周期性地将已经确认commit的日志应用到状态机FSM */ if (me->last_applied_idx < me->commit_idx) if (-1 == raft_apply_entry(me_)) return -1; return 0; }
クラスター内の各サーバーには選挙タイマーがあります。タイマーのタイムアウト期間内にサーバーがリーダーからハートビートを受信しない場合、クラスターは考慮されます。サーバーにリーダーがいない、またはリーダーがダウンしている場合、サーバーは候補となり、リーダーに立候補するための投票を開始します。次の raft_become_candidate 関数は、サーバーが候補になるための関数です。この関数は主に次のことを行います。内容:
現在の任期番号 (currentTerm) を増やす
#自分に投票する
#選挙タイムアウト タイマーをリセットする
送信 投票を要求する RPC は他のすべてのサーバーに送信されます
/** Follower成为Candidate执行的函数 */ void raft_become_candidate(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; int i; /*自增当前的任期号;给自己投票,设置自己的状态为CANDIDATE*/ raft_set_current_term(me_, raft_get_current_term(me_) + 1); for (i = 0; i < me->num_nodes; i++) raft_node_vote_for_me(me->nodes[i], 0); raft_vote(me_, me->node); me->current_leader = NULL; raft_set_state(me_, RAFT_STATE_CANDIDATE); /* 重置选举超时计时器。为了防止多个Candidate竞争,将下一次发起投票的时间间隔设置成随机值*/ /* TODO: this should probably be lower */ me->timeout_elapsed = rand() % me->election_timeout; /*发送请求投票的 RPC 给其他所有服务器*/ for (i = 0; i < me->num_nodes; i++) if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) raft_send_requestvote(me_, me->nodes[i]); }
投票リクエストの処理
投票リクエストを処理するロジックは、主に投票に同意するかどうかを決定することです。判定はリクエスト内の用語番号とログ情報鮮度と古さのレベル、同じ用語番号を持つ他のサーバーに投票したかどうかです。以前に投票したことがある場合、再度投票することはできません。1 人につき 1 票のみです。term > currentTerm の場合、フォロワー モードに切り替えます。 ここで投票リクエストを受信するサーバーは、ネットワーク状態が悪いリーダーであるか、まだ投票リクエストを発行する時間がない候補者である可能性があります。彼らは、自分の任期番号よりも新しい任期番号を持つリクエストを受信した後、次のことを行う必要があります。無条件でフォロワーになり、リーダーが 1 人だけ存在するようにします。
如果term daab63a7de571b85be82bb1122a6d0ecterm即为过时)
follower是否成功添加日志,如果添加失败,则减小发给follower的日志索引nextIndex再重试;如果添加成功则更新本地记录的follower日志信息,并检查日志是否最新,如果不是最新则继续发送添加日志请求。
新机器的日志添加,详见3.4节-- 成员变更
/** 处理添加日志请求回复 * / int raft_recv_appendentries_response(raft_server_t* me_, raft_node_t* node, msg_appendentries_response_t* r) { raft_server_private_t* me = (raft_server_private_t*)me_; __log(me_, node, "received appendentries response %s ci:%d rci:%d 1stidx:%d", r->success == 1 ? "SUCCESS" : "fail", raft_get_current_idx(me_), r->current_idx, r->first_idx); /* 过时的回复 -- 忽略 */ if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node)) return 0; /* oh~我不是Leader */ if (!raft_is_leader(me_)) return -1; /* 回复中的term比自己的要大,说明自己是一个过时的Leader,无条件转为Follower */ if (me->current_term < r->term) { raft_set_current_term(me_, r->term); raft_become_follower(me_); return 0; } /* 过时的回复,网络状况不好时会出现 */ else if (me->current_term != r->term) return 0; /* stop processing, this is a node we don't have in our configuration */ if (!node) return 0; /* 由于日志不一致导致添加日志不成功*/ if (0 == r->success) { assert(0 <= raft_node_get_next_idx(node)); /* 将nextIdex减*/ int next_idx = raft_node_get_next_idx(node); assert(0 <= next_idx); /* Follower的日志数量还远远少于Leader,将nextIdex设为回复中的current_idx+1和Leader * 当前索引中较小的一个,一般回复中的current_idx+1会比较小*/ if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); /* Follower的日志数量和Leader差不多,但是比对前一条日志时失败,这种情况将next_idx减1 * 重试*/ else raft_node_set_next_idx(node, next_idx - 1); /* 使用更新后的nextIdx重新发送添加日志请求 */ raft_send_appendentries(me_, node); return 0; } assert(r->current_idx <= raft_get_current_idx(me_)); /* 下面处理添加日志请求的情况 */ /* 更新本地记录的Follower的日志情况 */ raft_node_set_next_idx(node, r->current_idx + 1); raft_node_set_match_idx(node, r->current_idx); /* 如果是新加入的机器,则判断它的日志是否是最新,如果达到了最新,则赋予它投票权, * 这里逻辑的详细解释在第3.4节 -- 成员变更*/ if (!raft_node_is_voting(node) && -1 == me->voting_cfg_change_log_idx && raft_get_current_idx(me_) <= r->current_idx + 1 && me->cb.node_has_sufficient_logs && 0 == raft_node_has_sufficient_logs(node) ) { raft_node_set_has_sufficient_logs(node); me->cb.node_has_sufficient_logs(me_, me->udata, node); } /* 如果一条日志回复成功的数量超过一半,则将日志提交commit,即允许应用到状态机 */ int votes = 1; /* include me */ int point = r->current_idx; int i; for (i = 0; i < me->num_nodes; i++) { if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i])) continue; int match_idx = raft_node_get_match_idx(me->nodes[i]); if (0 < match_idx) { raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx); /*如果follower已经添加了索引大于等于r->current_idx的日志,则vote加1*/ if (ety->term == me->current_term && point <= match_idx) votes++; } } /* 投票数大于所有服务器的一半,则将日志提交 */ if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point) raft_set_commit_idx(me_, point); /* 如果follower的日志还没有最新,那么继续发送添加日志请求 */ if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node))) raft_send_appendentries(me_, node); /* periodic applies committed entries lazily */ return 0; }
3.3 成员变更
成员的变更都是以日志的形式下发的。添加的新成员分两阶段进行,第一阶段中新成员没有有投票权,但是有接收日志的权力;当它的日志同步到最新后就进入到第二阶段,由Leader赋予投票权,从而成为集群中完整的一员。删除成员相对比较简单,所有服务器收到删除成员的日志后,立马将该成员的信息从本地抹除。
添加成员过程
管理员向Leader发送添加成员命令
Leader添加一条 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加没有投票权的服务器。该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息。
当新成员的日志同步到最新后,Leader添加一条 RAFT_LOGTYPE_ADD_NODE日志,即有投票权的服务器,同样地,该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息,以后的投票活动会将新成员考虑进去。
删除成员过程
管理员向Leader发送删除成员命令。
Leader添加一条 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一样同步给其它服务器。收到该日志的服务器立即将被成员信息从本地删除。
感谢大家的阅读,希望大家收益多多。
推荐教程:《C语言》
以上がC言語によるRaft実装(コード付き)の詳細内容です。詳細については、PHP 中国語 Web サイトの他の関連記事を参照してください。