Rumah >pembangunan bahagian belakang >Golang >Menggunakan ZooKeeper untuk melaksanakan pendaftaran perkhidmatan dan penemuan dalam Beego

Menggunakan ZooKeeper untuk melaksanakan pendaftaran perkhidmatan dan penemuan dalam Beego

WBOY
WBOYasal
2023-06-22 08:21:151057semak imbas

Dalam seni bina perkhidmatan mikro, pendaftaran perkhidmatan dan penemuan adalah isu yang sangat penting. Untuk menyelesaikan masalah ini, kami boleh menggunakan ZooKeeper sebagai pusat pendaftaran perkhidmatan. Dalam artikel ini, kami akan memperkenalkan cara menggunakan ZooKeeper dalam rangka kerja Beego untuk melaksanakan pendaftaran dan penemuan perkhidmatan.

1. Pengenalan kepada ZooKeeper

ZooKeeper ialah perkhidmatan penyelarasan teragih sumber terbuka Ia adalah salah satu sub-projek Apache Hadoop. Peranan utama ZooKeeper adalah untuk menyelaraskan aplikasi yang diedarkan dan menyediakan fungsi seperti kunci yang diedarkan, perkhidmatan penamaan, pengurusan konfigurasi dan penyegerakan yang diedarkan. Dalam seni bina perkhidmatan mikro, ZooKeeper sering digunakan sebagai pendaftaran perkhidmatan.

2. Pemasangan dan konfigurasi ZooKeeper

Untuk pemasangan dan konfigurasi ZooKeeper, sila rujuk dokumentasi laman web rasmi: https://zookeeper.apache.org/doc/r3.6.3/ index.html. Di sini kami hanya memperkenalkan beberapa item konfigurasi yang biasa digunakan. ZooKeeper boleh dikonfigurasikan dalam fail konfigurasi ZooKeeper zoo.cfg.

Berikut ialah beberapa item konfigurasi penting:

  • tickTime: Ini ialah masa asas yang digunakan oleh ZooKeeper, dalam milisaat. Umumnya ditetapkan kepada 2000ms.
  • initLimit: Ini ialah had masa untuk mengambil bahagian dalam pilihan raya Masa maksimum untuk memulakan sambungan antara pelayan ZooKeeper, dinyatakan dalam gandaan tickTime. Umumnya ditetapkan kepada 5.
  • syncLimit: Ini ialah had masa untuk menerima transaksi terkini daripada Pemimpin, dinyatakan dalam gandaan tickTime. Umumnya ditetapkan kepada 2.
  • dataDir: Ini ialah direktori data ZooKeeper, iaitu /var/lib/zookeeper secara lalai.
  • clientPort: Ini ialah port sambungan klien yang didengari oleh pelayan ZooKeeper. Lalai ialah 2181.

3. Rangka kerja Beego mengintegrasikan ZooKeeper

  1. Pengenalan perpustakaan pelanggan ZooKeeper

Menggunakan ZooKeeper dalam rangka kerja klien Beego memerlukan pengenalan perpustakaan pelanggan ZooKeeper . Anda boleh menggunakan pustaka klien go zookeeper zk untuk melaksanakan operasi klien ZooKeeper.

Anda boleh menggunakan arahan berikut untuk memasang:

pergi dapatkan github.com/samuel/go-zookeeper/zk

  1. Laksanakan pendaftaran dan penemuan perkhidmatan

Dalam rangka kerja Beego, kami boleh merangkum objek pelanggan ZooKeeper untuk pendaftaran perkhidmatan dan penemuan ZooKeeper.

Berikut ialah contoh kod untuk pelanggan ZooKeeper:

package zk

import (
    "encoding/json"
    "fmt"
    "strings"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

type Server struct {
    Host string `json:"host"`
    Port int    `json:"port"`
}

type ZkClient struct {
    hosts          []string
    conn           *zk.Conn
    serversPath    string
    sessionTimeout time.Duration
}

func NewZkClient(hosts []string, serversPath string, sessionTimeout int) (*ZkClient, error) {
    // 链接zk,创建授权节点 /servers
    c, _, err := zk.Connect(hosts, time.Duration(sessionTimeout)*time.Second)
    if err != nil {
        return nil, err
    }
    if exists, _, err := c.Exists(serversPath); err != nil {
        return nil, err
    } else if !exists {
        if _, err := c.Create(serversPath, nil, 0, zk.WorldACL(zk.PermAll)); err != nil {
            return nil, fmt.Errorf("create znode error(%v)", err)
        }
    }
    return &ZkClient{
        hosts:          hosts,
        conn:           c,
        serversPath:    serversPath,
        sessionTimeout: time.Duration(sessionTimeout) * time.Second,
    }, nil
}

func (zk *ZkClient) Close() {
    zk.conn.Close()
}

// 检测授权节点是否存在
func (zk *ZkClient) ensureServerPath() error {
    exists, _, err := zk.conn.Exists(zk.serversPath)
    if err != nil {
        return err
    }
    if !exists {
        _, err = zk.conn.Create(zk.serversPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
    }
    return err
}

func (zk *ZkClient) Register(server *Server) error {
    if err := zk.ensureServerPath(); err != nil {
        return fmt.Errorf("register: ensureServerPath error(%v)", err)
    }
    //在 /servers 节点下创建一个临时性节点,节点名为 IP:Port。
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    if _, err := zk.conn.Create(path, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); err != nil {
        return fmt.Errorf("register: create error(%v)", err)
    }
    return nil
}

// 获取所有服务器列表
func (zk *ZkClient) GetServers() ([]Server, error) {
    list, _, err := zk.conn.Children(zk.serversPath)
    if err != nil {
        return nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, nil
}

func (zk *ZkClient) WatchServers() ([]Server, <-chan zk.Event, error) {
    list, _, ch, err := zk.conn.ChildrenW(zk.serversPath)
    if err != nil {
        return nil, nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, ch, nil
}

// 删除授权节点
func (zk *ZkClient) Remove(server *Server) error {
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    return zk.conn.Delete(path, -1)
}

func str2Int(str string) int {
    var (
        num int
        err error
    )
    if num, err = strconv.Atoi(str); err != nil {
        panic(err)
    }
    return num
}

Selepas melaksanakan objek pelanggan ZooKeeper, kami boleh menggunakan objek untuk mendaftar dan memanggil perkhidmatan.

Dalam rangka kerja Beego, kita boleh mencipta objek klien ZooKeeper dalam fungsi permulaan. Pada masa yang sama, dalam pemprosesan permintaan API, kami boleh menggunakan objek ini untuk menemui dan memanggil perkhidmatan.

Berikut ialah contoh kod untuk mendaftar dan memanggil perkhidmatan menggunakan ZooKeeper:

package controllers

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/astaxie/beego"
    "github.com/my/go-zk"
)

type MyController struct {
    beego.Controller
    zkCli *zk.ZkClient
}

func (c *MyController) Prepare() {
    var (
        err error
    )
    // 初始化ZooKeeper客户端
    servers := []string{"localhost:2181"}
    serversPath := "/myapp/servers"
    sessionTimeout := 30
    c.zkCli, err = zk.NewZkClient(servers, serversPath, sessionTimeout)
    if err != nil {
        log.Fatal(err)
    }
}

func (c *MyController) Get() {
    // 查询服务列表
    servers, _, err := c.zkCli.WatchServers()
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("get servers error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    // 随机调用一个服务
    if len(servers) == 0 {
        c.Data["json"] = map[string]interface{}{
            "code":    1002,
            "message": "no available servers",
        }
        c.ServeJSON()
        return
    }
    server := servers[rand.Intn(len(servers))]
    url := fmt.Sprintf("http://%s:%d/hello", server.Host, server.Port)
    resp, err := http.Get(url)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1003,
            "message": fmt.Sprintf("call server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    defer resp.Body.Close()
    result, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1004,
            "message": fmt.Sprintf("read response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    var respData struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
    }
    if err = json.Unmarshal(result, &respData); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1005,
            "message": fmt.Sprintf("parse response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = respData
    c.ServeJSON()
}

func (c *MyController) Delete() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 删除ZooKeeper中保存的服务节点
    if err = c.zkCli.Remove(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("delete server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Post() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 注册服务到ZooKeeper
    if err = c.zkCli.Register(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("register server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Finish() {
    // 关闭ZooKeeper客户端
    c.zkCli.Close()
}

4 Ringkasan

Dalam artikel ini, kami memperkenalkan cara menggunakan ZooKeeper dalam rangka kerja Beego. Pendaftaran perkhidmatan dan penemuan. Kami mula-mula memperkenalkan pemasangan dan konfigurasi ZooKeeper, dan kemudian menulis kod sampel yang merangkumi klien ZooKeeper. Akhir sekali, kami menggunakan rangka kerja Beego untuk menunjukkan cara menggunakan kod sampel ini untuk melaksanakan pendaftaran dan penemuan perkhidmatan. Semoga artikel ini bermanfaat kepada semua.

Atas ialah kandungan terperinci Menggunakan ZooKeeper untuk melaksanakan pendaftaran perkhidmatan dan penemuan dalam Beego. Untuk maklumat lanjut, sila ikut artikel berkaitan lain di laman web China PHP!

Kenyataan:
Kandungan artikel ini disumbangkan secara sukarela oleh netizen, dan hak cipta adalah milik pengarang asal. Laman web ini tidak memikul tanggungjawab undang-undang yang sepadan. Jika anda menemui sebarang kandungan yang disyaki plagiarisme atau pelanggaran, sila hubungi admin@php.cn