首頁  >  文章  >  後端開發  >  如何使用 etcd Raft 庫建立自己的分散式 KV 儲存系統

如何使用 etcd Raft 庫建立自己的分散式 KV 儲存系統

WBOY
WBOY原創
2024-07-17 10:55:18258瀏覽

How to Build Your Own Distributed KV Storage System Using the etcd Raft Library

介紹

raftexample是etcd提供的範例,示範了etcd raft共識演算法庫的使用。 raftexample 最終實作了一個提供 REST API 的分散式鍵值儲存服務。

本文將對raftexample的程式碼進行閱讀與分析,希望能幫助讀者更能理解如何使用etcd raft函式庫以及raft函式庫的實作邏輯。

建築學

raftexample的架構非常簡單,主要文件如下:

  • main.go: 負責組織 raft 模組、httpapi 模組、kvstore 模組之間的互動;
  • raft.go: 負責與raft庫交互,包括提交提案、接收需要發送的RPC訊息、進行網路傳輸等;
  • httpapi.go: 負責提供REST API,作為使用者請求的入口;
  • kvstore.go: 負責持久化儲存提交的日誌條目,相當於raft協定中的狀態機。

寫入請求的處理流程

寫入請求透過 HTTP PUT 請求到達 httpapi 模組的 ServeHTTP 方法。

curl -L http://127.0.0.1:12380/key -XPUT -d value

透過switch配對到HTTP請求方法後,進入PUT方法處理流程:

  • 從HTTP請求體讀取內容(即數值);
  • 透過kvstore模組的Propose方法建構提案(新增以key為key、value為value的鍵值對);
  • 由於沒有資料可回,所以回應客戶端204 StatusNoContent;

透過 raft 演算法庫提供的 Propose 方法將提案提交至 raft 演算法庫。

提案的內容可以是增加新的鍵值對、更新現有的鍵值對等

// httpapi.go
v, err := io.ReadAll(r.Body)
if err != nil {
    log.Printf("Failed to read on PUT (%v)\n", err)
    http.Error(w, "Failed on PUT", http.StatusBadRequest)
    return
}
h.store.Propose(key, string(v))
w.WriteHeader(http.StatusNoContent)

接下來,讓我們來看看kvstore模組的Propose方法,看看提案是如何建構和處理的。

在Propose方法中,我們首先使用gob對要寫入的鍵值對進行編碼,然後將編碼內容傳遞給proposeC,proposeC是負責將kvstore模組建構的proposal傳送到raft模組的通道。

// kvstore.go
func (s *kvstore) Propose(k string, v string) {
    var buf strings.Builder
    if err := gob.NewEncoder(&buf).Encode(kv{k, v}); err != nil {
        log.Fatal(err)
    }
    s.proposeC <- buf.String()
}

kvstore建構並傳遞給proposeC的proposal由raft模組中的serveChannels方法接收和處理。

在確認proposeC尚未關閉後,raft模組使用raft演算法庫提供的Propose方法將proposal提交給raft演算法庫進行處理。

// raft.go
select {
    case prop, ok := <-rc.proposeC:
    if !ok {
        rc.proposeC = nil
    } else {
        rc.node.Propose(context.TODO(), []byte(prop))
    }

提案提交後,遵循raft演算法流程。提案最終將轉發到領導節點(如果當前節點不是領導節點,並且您允許追隨者轉發提案,由 DisableProposalForwarding 配置控制)。 Leader 會將提案作為日誌條目加入其 raft 日誌中,並與其他 follower 節點同步。被視為已提交後,將應用到狀態機並將結果傳回給使用者。

但是,由於etcd raft函式庫本身不處理節點之間的通訊、追加到raft日誌、應用到狀態機等,所以raft函式庫只準備這些操作所需的資料。實際操作必須由我們來執行。

因此,我們需要從 raft 庫接收這些數據,並根據其類型進行相應的處理。 Ready方法傳回一個唯讀通道,透過該通道我們可以接收需要處理的資料。

需要注意的是,接收到的資料包含多個字段,例如要套用的快照、要附加到 raft 日誌的日誌條目、要透過網路傳輸的訊息等。

繼續我們的寫入請求範例(Leader 節點),收到對應資料後,我們需要持久保存快照、HardState 和 Entries,以處理伺服器崩潰引起的問題(例如,一個 follower 為多個候選人投票)。 HardState 和 Entries 共同構成了本文中提到的所有伺服器上的持久狀態。持久保存它們後,我們可以套用快照並追加到 raft 日誌中。

Since we are currently the leader node, the raft library will return MsgApp type messages to us (corresponding to AppendEntries RPC in the paper). We need to send these messages to the follower nodes. Here, we use the rafthttp provided by etcd for node communication and send the messages to follower nodes using the Send method.

// raft.go
case rd := <-rc.node.Ready():
    if !raft.IsEmptySnap(rd.Snapshot) {
        rc.saveSnap(rd.Snapshot)
    }
    rc.wal.Save(rd.HardState, rd.Entries)
    if !raft.IsEmptySnap(rd.Snapshot) {
        rc.raftStorage.ApplySnapshot(rd.Snapshot)
        rc.publishSnapshot(rd.Snapshot)
    }
    rc.raftStorage.Append(rd.Entries)
    rc.transport.Send(rc.processMessages(rd.Messages))
    applyDoneC, ok := rc.publishEntries(rc.entriesToApply(rd.CommittedEntries))
    if !ok {
        rc.stop()
        return
    }
    rc.maybeTriggerSnapshot(applyDoneC)
    rc.node.Advance()

Next, we use the publishEntries method to apply the committed raft log entries to the state machine. As mentioned earlier, in raftexample, the kvstore module acts as the state machine. In the publishEntries method, we pass the log entries that need to be applied to the state machine to commitC. Similar to the earlier proposeC, commitC is responsible for transmitting the log entries that the raft module has deemed committed to the kvstore module for application to the state machine.

// raft.go
rc.commitC <- &commit{data, applyDoneC}

In the readCommits method of the kvstore module, messages read from commitC are gob-decoded to retrieve the original key-value pairs, which are then stored in a map structure within the kvstore module.

// kvstore.go
for commit := range commitC {
    ...
    for _, data := range commit.data {
        var dataKv kv
        dec := gob.NewDecoder(bytes.NewBufferString(data))
        if err := dec.Decode(&dataKv); err != nil {
            log.Fatalf("raftexample: could not decode message (%v)", err)
        }
        s.mu.Lock()
        s.kvStore[dataKv.Key] = dataKv.Val
        s.mu.Unlock()
    }
    close(commit.applyDoneC)
}

Returning to the raft module, we use the Advance method to notify the raft library that we have finished processing the data read from the Ready channel and are ready to process the next batch of data.

Earlier, on the leader node, we sent MsgApp type messages to the follower nodes using the Send method. The follower node's rafthttp listens on the corresponding port to receive requests and return responses. Whether it's a request received by a follower node or a response received by a leader node, it will be submitted to the raft library for processing through the Step method.

raftNode implements the Raft interface in rafthttp, and the Process method of the Raft interface is called to handle the received request content (such as MsgApp messages).

// raft.go
func (rc *raftNode) Process(ctx context.Context, m raftpb.Message) error {
    return rc.node.Step(ctx, m)
}

The above describes the complete processing flow of a write request in raftexample.

Summary

This concludes the content of this article. By outlining the structure of raftexample and detailing the processing flow of a write request, I hope to help you better understand how to use the etcd raft library to build your own distributed KV storage service.

If there are any mistakes or issues, please feel free to comment or message me directly. Thank you.

References

  • https://github.com/etcd-io/etcd/tree/main/contrib/raftexample

  • https://github.com/etcd-io/raft

  • https://raft.github.io/raft.pdf

以上是如何使用 etcd Raft 庫建立自己的分散式 KV 儲存系統的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文內容由網友自願投稿,版權歸原作者所有。本站不承擔相應的法律責任。如發現涉嫌抄襲或侵權的內容,請聯絡admin@php.cn