Maison  >  Article  >  développement back-end  >  Utiliser ZooKeeper pour implémenter l'enregistrement et la découverte de services dans Beego

Utiliser ZooKeeper pour implémenter l'enregistrement et la découverte de services dans Beego

WBOY
WBOYoriginal
2023-06-22 08:21:15975parcourir

Dans l'architecture des microservices, l'enregistrement et la découverte des services sont une question très importante. Pour résoudre ce problème, nous pouvons utiliser ZooKeeper comme centre d'enregistrement de services. Dans cet article, nous présenterons comment utiliser ZooKeeper dans le framework Beego pour implémenter l'enregistrement et la découverte de services.

1. Introduction à ZooKeeper

ZooKeeper est un service de coordination distribué open source. C'est l'un des sous-projets d'Apache Hadoop. Le rôle principal de ZooKeeper est de coordonner les applications distribuées et de fournir des fonctions telles que les verrous distribués, les services de noms, la gestion de la configuration et la synchronisation distribuée. Dans l'architecture des microservices, ZooKeeper est souvent utilisé comme registre de services.

2. Installation et configuration de ZooKeeper

Pour l'installation et la configuration de ZooKeeper, veuillez vous référer à la documentation officielle du site : https://zookeeper.apache.org/doc/r3.6.3/index.html. Ici, nous présentons uniquement quelques éléments de configuration couramment utilisés. ZooKeeper peut être configuré dans le fichier de configuration ZooKeeper zoo.cfg.

Voici quelques éléments de configuration importants :

  • tickTime : Il s'agit de l'heure de base utilisée par ZooKeeper, en millisecondes. Généralement réglé sur 2 000 ms.
  • initLimit : Il s'agit du temps limite pour participer à l'élection. Le temps maximum pour qu'une connexion s'établisse entre les serveurs ZooKeeper, exprimé en multiples de tickTime. Généralement réglé sur 5.
  • syncLimit : C'est le délai de réception de la dernière transaction du Leader, exprimé en multiples de tickTime. Généralement réglé sur 2.
  • dataDir : Il s'agit du répertoire de données de ZooKeeper, qui est /var/lib/zookeeper par défaut.
  • clientPort : Il s'agit du port de connexion client que le serveur ZooKeeper écoute. La valeur par défaut est 2181.

3. Le framework Beego intègre ZooKeeper

  1. Présentation de la bibliothèque client ZooKeeper

L'utilisation de ZooKeeper dans le framework Beego nécessite l'introduction de la bibliothèque client ZooKeeper. Vous pouvez utiliser la bibliothèque client go zookeeper zk pour implémenter les opérations du client ZooKeeper.

Vous pouvez utiliser la commande suivante pour installer :

go get github.com/samuel/go-zookeeper/zk

  1. Implémenter l'enregistrement et la découverte du service

Dans le framework Beego, nous pouvons encapsuler un objet client ZooKeeper avec Pour l'inscription au service et la découverte de ZooKeeper.

Ce qui suit est un exemple de code pour un client ZooKeeper :

package zk

import (
    "encoding/json"
    "fmt"
    "strings"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

type Server struct {
    Host string `json:"host"`
    Port int    `json:"port"`
}

type ZkClient struct {
    hosts          []string
    conn           *zk.Conn
    serversPath    string
    sessionTimeout time.Duration
}

func NewZkClient(hosts []string, serversPath string, sessionTimeout int) (*ZkClient, error) {
    // 链接zk,创建授权节点 /servers
    c, _, err := zk.Connect(hosts, time.Duration(sessionTimeout)*time.Second)
    if err != nil {
        return nil, err
    }
    if exists, _, err := c.Exists(serversPath); err != nil {
        return nil, err
    } else if !exists {
        if _, err := c.Create(serversPath, nil, 0, zk.WorldACL(zk.PermAll)); err != nil {
            return nil, fmt.Errorf("create znode error(%v)", err)
        }
    }
    return &ZkClient{
        hosts:          hosts,
        conn:           c,
        serversPath:    serversPath,
        sessionTimeout: time.Duration(sessionTimeout) * time.Second,
    }, nil
}

func (zk *ZkClient) Close() {
    zk.conn.Close()
}

// 检测授权节点是否存在
func (zk *ZkClient) ensureServerPath() error {
    exists, _, err := zk.conn.Exists(zk.serversPath)
    if err != nil {
        return err
    }
    if !exists {
        _, err = zk.conn.Create(zk.serversPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
    }
    return err
}

func (zk *ZkClient) Register(server *Server) error {
    if err := zk.ensureServerPath(); err != nil {
        return fmt.Errorf("register: ensureServerPath error(%v)", err)
    }
    //在 /servers 节点下创建一个临时性节点,节点名为 IP:Port。
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    if _, err := zk.conn.Create(path, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); err != nil {
        return fmt.Errorf("register: create error(%v)", err)
    }
    return nil
}

// 获取所有服务器列表
func (zk *ZkClient) GetServers() ([]Server, error) {
    list, _, err := zk.conn.Children(zk.serversPath)
    if err != nil {
        return nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, nil
}

func (zk *ZkClient) WatchServers() ([]Server, <-chan zk.Event, error) {
    list, _, ch, err := zk.conn.ChildrenW(zk.serversPath)
    if err != nil {
        return nil, nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, ch, nil
}

// 删除授权节点
func (zk *ZkClient) Remove(server *Server) error {
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    return zk.conn.Delete(path, -1)
}

func str2Int(str string) int {
    var (
        num int
        err error
    )
    if num, err = strconv.Atoi(str); err != nil {
        panic(err)
    }
    return num
}

Après avoir implémenté l'objet client ZooKeeper, nous pouvons utiliser cet objet pour nous inscrire et appeler le service.

Dans le framework Beego, nous pouvons créer l'objet client ZooKeeper dans la fonction d'initialisation. Parallèlement, dans le traitement des requêtes API, nous pouvons utiliser cet objet pour découvrir et appeler des services.

Ce qui suit est un exemple de code pour enregistrer et appeler des services à l'aide de ZooKeeper :

package controllers

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/astaxie/beego"
    "github.com/my/go-zk"
)

type MyController struct {
    beego.Controller
    zkCli *zk.ZkClient
}

func (c *MyController) Prepare() {
    var (
        err error
    )
    // 初始化ZooKeeper客户端
    servers := []string{"localhost:2181"}
    serversPath := "/myapp/servers"
    sessionTimeout := 30
    c.zkCli, err = zk.NewZkClient(servers, serversPath, sessionTimeout)
    if err != nil {
        log.Fatal(err)
    }
}

func (c *MyController) Get() {
    // 查询服务列表
    servers, _, err := c.zkCli.WatchServers()
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("get servers error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    // 随机调用一个服务
    if len(servers) == 0 {
        c.Data["json"] = map[string]interface{}{
            "code":    1002,
            "message": "no available servers",
        }
        c.ServeJSON()
        return
    }
    server := servers[rand.Intn(len(servers))]
    url := fmt.Sprintf("http://%s:%d/hello", server.Host, server.Port)
    resp, err := http.Get(url)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1003,
            "message": fmt.Sprintf("call server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    defer resp.Body.Close()
    result, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1004,
            "message": fmt.Sprintf("read response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    var respData struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
    }
    if err = json.Unmarshal(result, &respData); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1005,
            "message": fmt.Sprintf("parse response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = respData
    c.ServeJSON()
}

func (c *MyController) Delete() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 删除ZooKeeper中保存的服务节点
    if err = c.zkCli.Remove(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("delete server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Post() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 注册服务到ZooKeeper
    if err = c.zkCli.Register(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("register server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Finish() {
    // 关闭ZooKeeper客户端
    c.zkCli.Close()
}

IV Résumé

Dans cet article, nous avons présenté comment utiliser ZooKeeper pour implémenter l'enregistrement et la découverte de services dans le framework Beego. Nous avons d'abord présenté l'installation et la configuration de ZooKeeper, puis écrit un exemple de code qui encapsule le client ZooKeeper. Enfin, nous avons utilisé le framework Beego pour démontrer comment utiliser cet exemple de code pour implémenter l'enregistrement et la découverte de services. J'espère que cet article sera utile à tout le monde.

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Le contenu de cet article est volontairement contribué par les internautes et les droits d'auteur appartiennent à l'auteur original. Ce site n'assume aucune responsabilité légale correspondante. Si vous trouvez un contenu suspecté de plagiat ou de contrefaçon, veuillez contacter admin@php.cn