c语言Raft实现(附代码)
1. 简介
本文介绍一个简单的Raft实现。如果有看过Raft论文,那么看这个Raft实现会觉得比较轻松,因为Raft论文中把实现的细节描述的非常详细,工程实现基本上就是将Raft论文中的描述用编程语言重新表达一遍。这就是Raft相对于Paxos最大的优点,即容易看懂并且容易实现。本文中介绍的Raft实现是用C语言码成的,除了日志压缩功能没有实现,其它特性都有实现,成员变更机制也做的比较简单,一次只支持一条配置更改。关于Raft的原理可以看Raft论文和《Raft理解》。
2.Raft基本概念
2.1 状态
raft有三种状态:Leader,Candidate和Follower。这三种状态的转换如下图所示。只有Leader具有处理客户请求和向Follower复制日志的权利。Candidate是一种Follower向Leader转换的中间状态,当集群中没有Leader的时候,Follower进入Candidate状态,并向集群中发起投票,获取到大多数投票的Follower会变成Leader。
2.2 消息
Raft为了提高协议的可理解性,消息类型的设定及其精简,只有下面两种请求。[indent]
requestVote 发起投票请求。Candidate发起投票时的请求。由集群中其它Follower和Candidate接收处理。
appendEntries 添加日志请求。Leader向Follower添加日志时发出的请求。[/indent]
2.3 任期号
Raft协议中使用任期号term来表明时间的新旧关系,这个term值在每个Leader的任期内是不变的,在不同Leader的中是绝对不同且随时间单调递增的。如果一条请求A的term比另一个请求B要大,那么说明请求B是过时的。
3.Raft实现
3.1 协议
先介绍四个重要数据结构,对应上面提到过的requestVote和appendEntries请求和回复。
/** requestVote 请求投票 * 竞选者Candidate去竞选Leader时发送给其它node的投票请求。 * 其它Leader或者Candidate收到term比自己大的投票请求时,会自动变成Follower*/ typedef struct { /** 当前任期号,通过任期号的大小与其它Candidate竞争Leader */ int term; /** 竞选者的id */ int candidate_id; /** 竞选者本地保存的最新一条日志的index */ int last_log_idx; /** 竞选者本地保存的最新一条日志的任期号*/ int last_log_term; } msg_requestvote_t; /** 投票请求的回复response. * 该response主要是给返回某个node是否接收了Candidate的投票请求. */ typedef struct { /** node的任期号,Candidate根据投票结果和node的任期号来更新自己的任期号 */ int term; /** 投票结果,如果node给Candidate投票则为true */ int vote_granted; } msg_requestvote_response_t; /** 添加日志请求. * Follower可以从该消息中知道哪些日志可以安全地提交到状态机FSM中去。 * Leader可以将该消息作为心跳消息定期发送。 * 旧的Leader和Candidate收到该消息后可能会自动变成Follower */ typedef struct { /** Leader当前的任期号 */ int term; /** 最新日志的前一条日志的index,用于Follower确认与Leader的日志完全一致 */ int prev_log_idx; /** 最新日志的前一条日志的任期号term */ int prev_log_term; /** leader当前已经确认提交到状态机FSM的日志索引index,这意味着Follower也可以安全地将该索引index以前的日志提交 */ int leader_commit; /** 这条添加日志消息携带的日志条数,该实现中最多只有一条 */ int n_entries; /** 这条添加日志消息中携带的日志数组 */ msg_entry_t* entries; } msg_appendentries_t; /** 添加日志回复. * 旧的Leader或Candidate收到该消息会变成Follower */ typedef struct { /** 当前任期号 */ int term; /** node成功添加日志时返回ture,即prev_log_index和prev_log_term都比对成功。否则返回false */ int success; /* 下面两个字段不是Raft论文中规定的字段: /* 用来优化日志追加过程,以加速日志的追加。Raft原文中的追加过程是一次只能追加一条日志*/ /** 处理添加日志请求后本地的最大日志索引 */ int current_idx; /** 从添加日志请求中接受的第一条日志索引 */ int first_idx; } msg_appendentries_response_t;
3.2 两个重要的抽象
raft_server_private_t 该结构体是Raft在实现中的抽象体,保存了Raft协议运行过程中状态和需要的所有数据。
typedef struct { /* 所有服务器比较固定的状态: */ /* 服务器最后一次知道的任期号(初始化为 0,持续递增) */ int current_term; /* 记录在当前分期内给哪个Candidate投过票, */ int voted_for; /* 日志条目集;每一个条目包含一个用户状态机执行的指令,和收到时的任期号 */ void* log; /* 变动比较频繁的变量: */ /* 已知的最大的已经被提交的日志条目的索引值 */ int commit_idx; /* 最后被应用到状态机的日志条目索引值(初始化为 0,持续递增) */ int last_applied_idx; /* 三种状态:follower/leader/candidate */ int state; /* 计时器,周期函数每次执行时会递增改值 */ int timeout_elapsed; raft_node_t* nodes; int num_nodes; int election_timeout; int request_timeout; /* 保存Leader的信息,没有Leader时为NULL */ raft_node_t* current_leader; /* callbacks,由调用该raft实现的调用者来实现,网络IO和持久存储 * 都由调用者在callback中实现 */ raft_cbs_t cb; void* udata; /* 自己的信息 */ raft_node_t* node; /* 该raft实现每次只进行一个服务器的配置更改,该变量记录raft server * 是否正在进行配置更改*/ int voting_cfg_change_log_idx; } raft_server_private_t;
raft_node_private_t 集群中机器节点的抽象体,包含了raft协议运行过程中需要保存的其它机器上的信息
typedef struct { void* udata; /*一般保存与其它机器的连接信息,由使用者决定怎么实现连接*/ int next_idx; /*对于每一个服务器,需要发送给他的下一个日志条目的索引值(初始化为领导人最后索引值加一)*/ int match_idx; /*对于每一个服务器,已经复制给他的日志的最高索引值*/ int flags; /*有三种取值,是相或的关系 1:该机器有给我投票 2:该机器有投票权 3: 该机器有最新的日志*/ int id; /*机器对应的id值,这个每台机器在全局都是唯一的*/ } raft_node_private_t;
3.3 Raft协议过程
周期函数 Raft需要周期性地做一些事情,比如Leader需要周期性地给其它服务器append日志,以让日志落后的服务器有机会追上来;所有服务器需要周期性地将已经确认提交的日志应用到状态机中去等等。
raft_periodic函数是该raft实现中被周期性调用的函数,调用周期是1000ms。机器在不同状态下会在这个函数中做不同的事情。Leader周期性地向Follower同步日志。而Follower周期性地检测是否在特定的时间内没有收到过来自Leader的心跳包,如果是的话就变成Candidate开始发起投票竞选Leader。不管是Leader还是Follower,都会周期性地将已经提交的日志commit到状态机FSM中去。
/** raft周期性执行的函数,实现raft中的定时器以及定期应用日志到状态机 */ int raft_periodic(raft_server_t* me_, int msec_since_last_period) { raft_server_private_t* me = (raft_server_private_t*)me_; /* 选举计时器;Follower每次收到Leader的心跳后会重置清0,Leader每次发送日志也会清0 */ me->timeout_elapsed += msec_since_last_period; /* Leader周期性地向Follower同步日志 */ if (me->state == RAFT_STATE_LEADER) { if (me->request_timeout <= me->timeout_elapsed) raft_send_appendentries_all(me_); } /* Follower检测选举计时器是否超时 */ else if (me->election_timeout <= me->timeout_elapsed) { if (1 < me->num_nodes) raft_election_start(me_); } /* 周期性地将已经确认commit的日志应用到状态机FSM */ if (me->last_applied_idx < me->commit_idx) if (-1 == raft_apply_entry(me_)) return -1; return 0; }
成为竞选者Candidate 集群中每个服务器都有一个竞选计时器,当一个服务器在计时器超时时间内都没有收到来自Leader的心跳,则认为集群中不存在Leader或者是Leader挂了,该服务器就会变成Candidate,进而发起投票去竞选Leader,下面raft_become_candidate函数就是服务器变成Candidate的函数,函数中主要做这几件事情:
自增当前的任期号(currentTerm)
给自己投票
重置选举超时计时器
发送请求投票的 RPC 给其他所有服务器
/** Follower成为Candidate执行的函数 */ void raft_become_candidate(raft_server_t* me_) { raft_server_private_t* me = (raft_server_private_t*)me_; int i; /*自增当前的任期号;给自己投票,设置自己的状态为CANDIDATE*/ raft_set_current_term(me_, raft_get_current_term(me_) + 1); for (i = 0; i < me->num_nodes; i++) raft_node_vote_for_me(me->nodes[i], 0); raft_vote(me_, me->node); me->current_leader = NULL; raft_set_state(me_, RAFT_STATE_CANDIDATE); /* 重置选举超时计时器。为了防止多个Candidate竞争,将下一次发起投票的时间间隔设置成随机值*/ /* TODO: this should probably be lower */ me->timeout_elapsed = rand() % me->election_timeout; /*发送请求投票的 RPC 给其他所有服务器*/ for (i = 0; i < me->num_nodes; i++) if (me->node != me->nodes[i] && raft_node_is_voting(me->nodes[i])) raft_send_requestvote(me_, me->nodes[i]); }
处理投票请求 处理投票请求的逻辑主要就是判断是否要同意投票,判断的依据就是请求中的任期号和日志信息的新旧程度,还有就是自己是否给其它相同任期号的服务器投过票,如果投过就不能再投,每人只有一票投票权。
如果term > currentTerm, 则转为Follower模式。
这里收到投票请求的服务器有可能是一个网络状况不佳的Leader或者是一个还没来得及发出投票请求的Candidate,他们收到任期号比自己要新的请求后,都要无条件变成Follower,以保证只有一个Leader存在
如果term daab63a7de571b85be82bb1122a6d0ecterm即为过时)
follower是否成功添加日志,如果添加失败,则减小发给follower的日志索引nextIndex再重试;如果添加成功则更新本地记录的follower日志信息,并检查日志是否最新,如果不是最新则继续发送添加日志请求。
新机器的日志添加,详见3.4节-- 成员变更
/** 处理添加日志请求回复 * / int raft_recv_appendentries_response(raft_server_t* me_, raft_node_t* node, msg_appendentries_response_t* r) { raft_server_private_t* me = (raft_server_private_t*)me_; __log(me_, node, "received appendentries response %s ci:%d rci:%d 1stidx:%d", r->success == 1 ? "SUCCESS" : "fail", raft_get_current_idx(me_), r->current_idx, r->first_idx); /* 过时的回复 -- 忽略 */ if (r->current_idx != 0 && r->current_idx <= raft_node_get_match_idx(node)) return 0; /* oh~我不是Leader */ if (!raft_is_leader(me_)) return -1; /* 回复中的term比自己的要大,说明自己是一个过时的Leader,无条件转为Follower */ if (me->current_term < r->term) { raft_set_current_term(me_, r->term); raft_become_follower(me_); return 0; } /* 过时的回复,网络状况不好时会出现 */ else if (me->current_term != r->term) return 0; /* stop processing, this is a node we don't have in our configuration */ if (!node) return 0; /* 由于日志不一致导致添加日志不成功*/ if (0 == r->success) { assert(0 <= raft_node_get_next_idx(node)); /* 将nextIdex减*/ int next_idx = raft_node_get_next_idx(node); assert(0 <= next_idx); /* Follower的日志数量还远远少于Leader,将nextIdex设为回复中的current_idx+1和Leader * 当前索引中较小的一个,一般回复中的current_idx+1会比较小*/ if (r->current_idx < next_idx - 1) raft_node_set_next_idx(node, min(r->current_idx + 1, raft_get_current_idx(me_))); /* Follower的日志数量和Leader差不多,但是比对前一条日志时失败,这种情况将next_idx减1 * 重试*/ else raft_node_set_next_idx(node, next_idx - 1); /* 使用更新后的nextIdx重新发送添加日志请求 */ raft_send_appendentries(me_, node); return 0; } assert(r->current_idx <= raft_get_current_idx(me_)); /* 下面处理添加日志请求的情况 */ /* 更新本地记录的Follower的日志情况 */ raft_node_set_next_idx(node, r->current_idx + 1); raft_node_set_match_idx(node, r->current_idx); /* 如果是新加入的机器,则判断它的日志是否是最新,如果达到了最新,则赋予它投票权, * 这里逻辑的详细解释在第3.4节 -- 成员变更*/ if (!raft_node_is_voting(node) && -1 == me->voting_cfg_change_log_idx && raft_get_current_idx(me_) <= r->current_idx + 1 && me->cb.node_has_sufficient_logs && 0 == raft_node_has_sufficient_logs(node) ) { raft_node_set_has_sufficient_logs(node); me->cb.node_has_sufficient_logs(me_, me->udata, node); } /* 如果一条日志回复成功的数量超过一半,则将日志提交commit,即允许应用到状态机 */ int votes = 1; /* include me */ int point = r->current_idx; int i; for (i = 0; i < me->num_nodes; i++) { if (me->node == me->nodes[i] || !raft_node_is_voting(me->nodes[i])) continue; int match_idx = raft_node_get_match_idx(me->nodes[i]); if (0 < match_idx) { raft_entry_t* ety = raft_get_entry_from_idx(me_, match_idx); /*如果follower已经添加了索引大于等于r->current_idx的日志,则vote加1*/ if (ety->term == me->current_term && point <= match_idx) votes++; } } /* 投票数大于所有服务器的一半,则将日志提交 */ if (me->num_nodes / 2 < votes && raft_get_commit_idx(me_) < point) raft_set_commit_idx(me_, point); /* 如果follower的日志还没有最新,那么继续发送添加日志请求 */ if (raft_get_entry_from_idx(me_, raft_node_get_next_idx(node))) raft_send_appendentries(me_, node); /* periodic applies committed entries lazily */ return 0; }
3.3 成员变更
成员的变更都是以日志的形式下发的。添加的新成员分两阶段进行,第一阶段中新成员没有有投票权,但是有接收日志的权力;当它的日志同步到最新后就进入到第二阶段,由Leader赋予投票权,从而成为集群中完整的一员。删除成员相对比较简单,所有服务器收到删除成员的日志后,立马将该成员的信息从本地抹除。
添加成员过程
管理员向Leader发送添加成员命令
Leader添加一条 RAFT_LOGTYPE_ADD_NONVOTING_NODE日志,即添加没有投票权的服务器。该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息。
当新成员的日志同步到最新后,Leader添加一条 RAFT_LOGTYPE_ADD_NODE日志,即有投票权的服务器,同样地,该日志与其它普通日志一样同步给集群中其它服务器。收到该日志的服务器在本地保存该新成员的信息,以后的投票活动会将新成员考虑进去。
删除成员过程
管理员向Leader发送删除成员命令。
Leader添加一条 RAFT_LOGTYPE_REMOVE_NODE 日志,并跟普通日志一样同步给其它服务器。收到该日志的服务器立即将被成员信息从本地删除。
感谢大家的阅读,希望大家收益多多。
推荐教程:《C语言》
以上是c语言Raft实现(附代码)的详细内容。更多信息请关注PHP中文网其他相关文章!