>백엔드 개발 >Golang >ZooKeeper를 사용하여 Beego에서 서비스 등록 및 검색 구현

ZooKeeper를 사용하여 Beego에서 서비스 등록 및 검색 구현

WBOY
WBOY원래의
2023-06-22 08:21:151057검색

마이크로서비스 아키텍처에서 서비스 등록 및 검색은 매우 중요한 문제입니다. 이 문제를 해결하기 위해 ZooKeeper를 서비스 등록 센터로 사용할 수 있습니다. 이 기사에서는 Beego 프레임워크에서 ZooKeeper를 사용하여 서비스 등록 및 검색을 구현하는 방법을 소개합니다.

1. ZooKeeper 소개

ZooKeeper는 분산형 오픈소스 분산 조정 서비스입니다. Apache Hadoop의 하위 프로젝트 중 하나입니다. ZooKeeper의 주요 역할은 분산 애플리케이션을 조정하고 분산 잠금, 이름 지정 서비스, 구성 관리 및 분산 동기화와 같은 기능을 제공하는 것입니다. 마이크로서비스 아키텍처에서 ZooKeeper는 서비스 레지스트리로 사용되는 경우가 많습니다.

2. ZooKeeper 설치 및 구성

ZooKeeper 설치 및 구성은 공식 웹사이트 설명서(https://zookeeper.apache.org/doc/r3.6.3/index.html)를 참조하세요. 여기에서는 일반적으로 사용되는 일부 구성 항목만 소개합니다. ZooKeeper는 ZooKeeper 구성 파일 Zoo.cfg에서 구성할 수 있습니다.

다음은 몇 가지 중요한 구성 항목입니다.

  • tickTime: ZooKeeper에서 사용하는 기본 시간(밀리초)입니다. 일반적으로 2000ms로 설정됩니다.
  • initLimit: 선거에 참여하기 위한 시간 제한입니다. ZooKeeper 서버 간에 연결이 시작되는 최대 시간으로, TickTime의 배수로 표시됩니다. 일반적으로 5로 설정됩니다.
  • syncLimit: 리더로부터 최신 트랜잭션을 수신하는 데 걸리는 시간 제한으로, TickTime의 배수로 표시됩니다. 일반적으로 2로 설정됩니다.
  • dataDir: ZooKeeper의 데이터 디렉터리이며 기본적으로 /var/lib/zookeeper입니다.
  • clientPort: ZooKeeper 서버가 수신하는 클라이언트 연결 포트입니다. 기본값은 2181입니다.

3. Beego 프레임워크는 ZooKeeper를 통합합니다

  1. ZooKeeper 클라이언트 라이브러리 소개

Beego 프레임워크에서 ZooKeeper를 사용하려면 ZooKeeper 클라이언트 라이브러리를 도입해야 합니다. Go ZooKeeper 클라이언트 라이브러리 zk를 사용하여 ZooKeeper 클라이언트 작업을 구현할 수 있습니다.

다음 명령을 사용하여 설치할 수 있습니다.

go get github.com/samuel/go-zookeeper/zk

  1. 서비스 등록 및 검색 구현

Beego 프레임워크에서는 ZooKeeper 클라이언트 개체를 다음과 같이 캡슐화할 수 있습니다. ZooKeeper의 서비스 등록 및 검색을 위해.

다음은 ZooKeeper 클라이언트의 샘플 코드입니다.

package zk

import (
    "encoding/json"
    "fmt"
    "strings"
    "time"

    "github.com/samuel/go-zookeeper/zk"
)

type Server struct {
    Host string `json:"host"`
    Port int    `json:"port"`
}

type ZkClient struct {
    hosts          []string
    conn           *zk.Conn
    serversPath    string
    sessionTimeout time.Duration
}

func NewZkClient(hosts []string, serversPath string, sessionTimeout int) (*ZkClient, error) {
    // 链接zk,创建授权节点 /servers
    c, _, err := zk.Connect(hosts, time.Duration(sessionTimeout)*time.Second)
    if err != nil {
        return nil, err
    }
    if exists, _, err := c.Exists(serversPath); err != nil {
        return nil, err
    } else if !exists {
        if _, err := c.Create(serversPath, nil, 0, zk.WorldACL(zk.PermAll)); err != nil {
            return nil, fmt.Errorf("create znode error(%v)", err)
        }
    }
    return &ZkClient{
        hosts:          hosts,
        conn:           c,
        serversPath:    serversPath,
        sessionTimeout: time.Duration(sessionTimeout) * time.Second,
    }, nil
}

func (zk *ZkClient) Close() {
    zk.conn.Close()
}

// 检测授权节点是否存在
func (zk *ZkClient) ensureServerPath() error {
    exists, _, err := zk.conn.Exists(zk.serversPath)
    if err != nil {
        return err
    }
    if !exists {
        _, err = zk.conn.Create(zk.serversPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
    }
    return err
}

func (zk *ZkClient) Register(server *Server) error {
    if err := zk.ensureServerPath(); err != nil {
        return fmt.Errorf("register: ensureServerPath error(%v)", err)
    }
    //在 /servers 节点下创建一个临时性节点,节点名为 IP:Port。
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    if _, err := zk.conn.Create(path, []byte(""), zk.FlagEphemeral, zk.WorldACL(zk.PermAll)); err != nil {
        return fmt.Errorf("register: create error(%v)", err)
    }
    return nil
}

// 获取所有服务器列表
func (zk *ZkClient) GetServers() ([]Server, error) {
    list, _, err := zk.conn.Children(zk.serversPath)
    if err != nil {
        return nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, nil
}

func (zk *ZkClient) WatchServers() ([]Server, <-chan zk.Event, error) {
    list, _, ch, err := zk.conn.ChildrenW(zk.serversPath)
    if err != nil {
        return nil, nil, err
    }
    servers := make([]Server, 0, len(list))
    for _, node := range list {
        data, _, err := zk.conn.Get(zk.serversPath + "/" + node)
        if err != nil {
            continue
        }
        arr := strings.Split(node, ":")
        servers = append(servers, Server{
            Host: arr[0],
            Port: str2Int(arr[1]),
        })
    }
    return servers, ch, nil
}

// 删除授权节点
func (zk *ZkClient) Remove(server *Server) error {
    path := fmt.Sprintf("%s/%s:%d", zk.serversPath, server.Host, server.Port)
    return zk.conn.Delete(path, -1)
}

func str2Int(str string) int {
    var (
        num int
        err error
    )
    if num, err = strconv.Atoi(str); err != nil {
        panic(err)
    }
    return num
}

ZooKeeper 클라이언트 개체를 구현한 후 이 개체를 사용하여 서비스를 등록하고 호출할 수 있습니다.

Beego 프레임워크에서는 초기화 함수에서 ZooKeeper 클라이언트 개체를 생성할 수 있습니다. 동시에 API 요청 처리에서 이 개체를 사용하여 서비스를 검색하고 호출할 수 있습니다.

다음은 ZooKeeper를 사용하여 서비스를 등록하고 호출하기 위한 샘플 코드입니다.

package controllers

import (
    "encoding/json"
    "fmt"
    "log"

    "github.com/astaxie/beego"
    "github.com/my/go-zk"
)

type MyController struct {
    beego.Controller
    zkCli *zk.ZkClient
}

func (c *MyController) Prepare() {
    var (
        err error
    )
    // 初始化ZooKeeper客户端
    servers := []string{"localhost:2181"}
    serversPath := "/myapp/servers"
    sessionTimeout := 30
    c.zkCli, err = zk.NewZkClient(servers, serversPath, sessionTimeout)
    if err != nil {
        log.Fatal(err)
    }
}

func (c *MyController) Get() {
    // 查询服务列表
    servers, _, err := c.zkCli.WatchServers()
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("get servers error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    // 随机调用一个服务
    if len(servers) == 0 {
        c.Data["json"] = map[string]interface{}{
            "code":    1002,
            "message": "no available servers",
        }
        c.ServeJSON()
        return
    }
    server := servers[rand.Intn(len(servers))]
    url := fmt.Sprintf("http://%s:%d/hello", server.Host, server.Port)
    resp, err := http.Get(url)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1003,
            "message": fmt.Sprintf("call server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    defer resp.Body.Close()
    result, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1004,
            "message": fmt.Sprintf("read response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    var respData struct {
        Code int    `json:"code"`
        Msg  string `json:"msg"`
    }
    if err = json.Unmarshal(result, &respData); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1005,
            "message": fmt.Sprintf("parse response error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = respData
    c.ServeJSON()
}

func (c *MyController) Delete() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 删除ZooKeeper中保存的服务节点
    if err = c.zkCli.Remove(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("delete server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Post() {
    var (
        server zk.Server
        err    error
    )
    // 解析请求数据
    if err = json.Unmarshal(c.Ctx.Input.RequestBody, &server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": "invalid parameters",
        }
        c.ServeJSON()
        return
    }
    // 注册服务到ZooKeeper
    if err = c.zkCli.Register(&server); err != nil {
        c.Data["json"] = map[string]interface{}{
            "code":    1001,
            "message": fmt.Sprintf("register server error(%v)", err),
        }
        c.ServeJSON()
        return
    }
    c.Data["json"] = map[string]interface{}{
        "code":    200,
        "message": "success",
    }
    c.ServeJSON()
}

func (c *MyController) Finish() {
    // 关闭ZooKeeper客户端
    c.zkCli.Close()
}

IV. 요약

이 기사에서는 ZooKeeper를 사용하여 Beego 프레임워크에서 서비스 등록 및 검색을 구현하는 방법을 소개했습니다. 먼저 ZooKeeper의 설치 및 구성을 소개한 다음 ZooKeeper 클라이언트를 캡슐화하는 샘플 코드를 작성했습니다. 마지막으로 Beego 프레임워크를 사용하여 이 샘플 코드를 사용하여 서비스 등록 및 검색을 구현하는 방법을 보여주었습니다. 이 기사가 모든 사람에게 도움이 되기를 바랍니다.

위 내용은 ZooKeeper를 사용하여 Beego에서 서비스 등록 및 검색 구현의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
본 글의 내용은 네티즌들의 자발적인 기여로 작성되었으며, 저작권은 원저작자에게 있습니다. 본 사이트는 이에 상응하는 법적 책임을 지지 않습니다. 표절이나 침해가 의심되는 콘텐츠를 발견한 경우 admin@php.cn으로 문의하세요.