Heim  >  Artikel  >  Backend-Entwicklung  >  Verwendung von ZooKeeper zur Implementierung der Dienstregistrierung und -erkennung in Beego

Verwendung von ZooKeeper zur Implementierung der Dienstregistrierung und -erkennung in Beego

WBOY
WBOYOriginal
2023-06-22 08:21:151017Durchsuche

In der Microservice-Architektur ist die Registrierung und Erkennung von Diensten ein sehr wichtiges Thema. Um dieses Problem zu lösen, können wir ZooKeeper als Service-Registrierungscenter verwenden. In diesem Artikel stellen wir vor, wie Sie ZooKeeper im Beego-Framework verwenden, um die Registrierung und Erkennung von Diensten zu implementieren.

1. Einführung in ZooKeeper

ZooKeeper ist ein verteilter Open-Source-Koordinierungsdienst. Es ist eines der Unterprojekte von Apache Hadoop. Die Hauptaufgabe von ZooKeeper besteht darin, verteilte Anwendungen zu koordinieren und Funktionen wie verteilte Sperren, Benennungsdienste, Konfigurationsverwaltung und verteilte Synchronisierung bereitzustellen. In der Microservice-Architektur wird ZooKeeper häufig als Dienstregistrierung verwendet.

2. ZooKeeper-Installation und -Konfiguration

Informationen zur Installation und Konfiguration von ZooKeeper finden Sie in der offiziellen Website-Dokumentation: https://zookeeper.apache.org/doc/r3.6.3/index.html. Hier stellen wir nur einige häufig verwendete Konfigurationselemente vor. ZooKeeper kann in der ZooKeeper-Konfigurationsdatei zoo.cfg konfiguriert werden.

Im Folgenden sind einige wichtige Konfigurationselemente aufgeführt:

  • tickTime: Dies ist die von ZooKeeper verwendete Grundzeit in Millisekunden. Im Allgemeinen auf 2000 ms eingestellt.
  • initLimit: Dies ist das Zeitlimit für die Teilnahme an der Wahl. Die maximale Zeit für den Aufbau einer Verbindung zwischen ZooKeeper-Servern, ausgedrückt in Vielfachen von tickTime. Im Allgemeinen auf 5 eingestellt.
  • syncLimit: Dies ist das Zeitlimit für den Empfang der neuesten Transaktion vom Leader, ausgedrückt in Vielfachen von tickTime. Im Allgemeinen auf 2 eingestellt.
  • dataDir: Dies ist das Datenverzeichnis von ZooKeeper, standardmäßig /var/lib/zookeeper.
  • clientPort: Dies ist der Client-Verbindungsport, den der ZooKeeper-Server überwacht. Der Standardwert ist 2181.

3. Das Beego-Framework integriert ZooKeeper

  1. Einführung der ZooKeeper-Clientbibliothek

Die Verwendung von ZooKeeper im Beego-Framework erfordert die Einführung der ZooKeeper-Clientbibliothek. Sie können die go zookeeper-Clientbibliothek zk verwenden, um ZooKeeper-Clientoperationen zu implementieren.

Sie können den folgenden Befehl zur Installation verwenden:

go get github.com/samuel/go-zookeeper/zk

  1. Implementieren Sie die Dienstregistrierung und -erkennung

Im Beego-Framework können wir ein ZooKeeper-Clientobjekt mit kapseln Zur Dienstregistrierung und Entdeckung von ZooKeeper.

Das Folgende ist ein Beispielcode für einen ZooKeeper-Client:

package zk

import (
    "encoding/json"
    "fmt"
    "strings"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

type Server struct {
    Host string `json:"host"`
    Port int    `json:"port"`
}

type ZkClient struct {
    hosts          []string
    conn           *zk.Conn
    serversPath    string
    sessionTimeout time.Duration
}

func NewZkClient(hosts []string, serversPath string, sessionTimeout int) (*ZkClient, error) {
    // 链接zk,创建授权节点 /servers
    c, _, err := zk.Connect(hosts, time.Duration(sessionTimeout)*time.Second)
    if err != nil {
        return nil, err
    }
    if exists, _, err := c.Exists(serversPath); err != nil {
        return nil, err
    } else if !exists {
        if _, err := c.Create(serversPath, nil, 0, zk.WorldACL(zk.PermAll)); err != nil {
            return nil, fmt.Errorf("create znode error(%v)", err)
        }
    }
    return &ZkClient{
        hosts:          hosts,
        conn:           c,
        serversPath:    serversPath,
        sessionTimeout: time.Duration(sessionTimeout) * time.Second,
    }, nil
}

func (zk *ZkClient) Close() {
    zk.conn.Close()
}

// 检测授权节点是否存在
func (zk *ZkClient) ensureServerPath() error {
    exists, _, err := zk.conn.Exists(zk.serversPath)
    if err != nil {
        return err
    }
    if !exists {
        _, err = zk.conn.Create(zk.serversPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
    }
    return err
}

func (zk *ZkClient) Register(server *Server) error {
    if err := zk.ensureServerPath(); err != nil {
        return fmt.Errorf("register: ensureServerPath error(%v)", err)
    }
    //在 /servers 节点下创建一个临时性节点,节点名为 IP:Port。
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    if _, err := zk.conn.Create(path, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); err != nil {
        return fmt.Errorf("register: create error(%v)", err)
    }
    return nil
}

// 获取所有服务器列表
func (zk *ZkClient) GetServers() ([]Server, error) {
    list, _, err := zk.conn.Children(zk.serversPath)
    if err != nil {
        return nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, nil
}

func (zk *ZkClient) WatchServers() ([]Server, <-chan zk.Event, error) {
    list, _, ch, err := zk.conn.ChildrenW(zk.serversPath)
    if err != nil {
        return nil, nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, ch, nil
}

// 删除授权节点
func (zk *ZkClient) Remove(server *Server) error {
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    return zk.conn.Delete(path, -1)
}

func str2Int(str string) int {
    var (
        num int
        err error
    )
    if num, err = strconv.Atoi(str); err != nil {
        panic(err)
    }
    return num
}

Nach der Implementierung des ZooKeeper-Clientobjekts können wir dieses Objekt verwenden, um den Dienst zu registrieren und aufzurufen.

Im Beego-Framework können wir das ZooKeeper-Clientobjekt in der Initialisierungsfunktion erstellen. Gleichzeitig können wir dieses Objekt bei der Verarbeitung von API-Anfragen verwenden, um Dienste zu erkennen und aufzurufen.

Das Folgende ist ein Beispielcode zum Registrieren und Aufrufen von Diensten mit ZooKeeper:

package controllers

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/astaxie/beego"
    "github.com/my/go-zk"
)

type MyController struct {
    beego.Controller
    zkCli *zk.ZkClient
}

func (c *MyController) Prepare() {
    var (
        err error
    )
    // 初始化ZooKeeper客户端
    servers := []string{"localhost:2181"}
    serversPath := "/myapp/servers"
    sessionTimeout := 30
    c.zkCli, err = zk.NewZkClient(servers, serversPath, sessionTimeout)
    if err != nil {
        log.Fatal(err)
    }
}

func (c *MyController) Get() {
    // 查询服务列表
    servers, _, err := c.zkCli.WatchServers()
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("get servers error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    // 随机调用一个服务
    if len(servers) == 0 {
        c.Data["json"] = map[string]interface{}{
            "code":    1002,
            "message": "no available servers",
        }
        c.ServeJSON()
        return
    }
    server := servers[rand.Intn(len(servers))]
    url := fmt.Sprintf("http://%s:%d/hello", server.Host, server.Port)
    resp, err := http.Get(url)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1003,
            "message": fmt.Sprintf("call server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    defer resp.Body.Close()
    result, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1004,
            "message": fmt.Sprintf("read response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    var respData struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
    }
    if err = json.Unmarshal(result, &respData); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1005,
            "message": fmt.Sprintf("parse response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = respData
    c.ServeJSON()
}

func (c *MyController) Delete() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 删除ZooKeeper中保存的服务节点
    if err = c.zkCli.Remove(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("delete server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Post() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 注册服务到ZooKeeper
    if err = c.zkCli.Register(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("register server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Finish() {
    // 关闭ZooKeeper客户端
    c.zkCli.Close()
}

IV. Zusammenfassung

In diesem Artikel haben wir vorgestellt, wie man ZooKeeper verwendet, um die Dienstregistrierung und -erkennung im Beego-Framework zu implementieren. Wir haben zunächst die Installation und Konfiguration von ZooKeeper vorgestellt und dann einen Beispielcode geschrieben, der den ZooKeeper-Client kapselt. Abschließend haben wir das Beego-Framework verwendet, um zu demonstrieren, wie dieser Beispielcode zur Implementierung der Dienstregistrierung und -erkennung verwendet wird. Ich hoffe, dieser Artikel ist für alle hilfreich.

Das obige ist der detaillierte Inhalt vonVerwendung von ZooKeeper zur Implementierung der Dienstregistrierung und -erkennung in Beego. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!

Stellungnahme:
Der Inhalt dieses Artikels wird freiwillig von Internetnutzern beigesteuert und das Urheberrecht liegt beim ursprünglichen Autor. Diese Website übernimmt keine entsprechende rechtliche Verantwortung. Wenn Sie Inhalte finden, bei denen der Verdacht eines Plagiats oder einer Rechtsverletzung besteht, wenden Sie sich bitte an admin@php.cn