>  기사  >  백엔드 개발  >  Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명

Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명

尚
앞으로
2019-11-27 15:47:354057검색

Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명

이 로그 수집 시스템의 프레임워크를 아래와 같이 정리했습니다

Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명이번에 구현할 코드의 전체적인 로직은 다음과 같습니다.

Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명전체 코드 주소는 https://github입니다. com/pythonsite/logagent

etcd 소개

구성 공유 및 서비스 검색에 사용할 수 있는 고가용성 분산 키-값 저장소

유사 프로젝트: Zookeeper 및 consul

개발 언어: go

인터페이스 : 사용하기 쉬운 편안한 인터페이스 제공

구현 알고리즘: raft 알고리즘 기반의 강력한 일관성, 고가용성 서비스 저장 디렉터리

etcd 응용 시나리오:

1. 서비스 검색 및 서비스 등록

2. 클라이언트가 사용해야 함)

3. 분산 잠금

4. 마스터 선거

공식 웹사이트에는 etcd에 대한 매우 간결한 소개가 있습니다:

Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명

etcd 구성:
다운로드 주소: https://github. com/coreos/etcd/releases/
자신의 환경에 맞게 해당 버전을 다운받아 실행하시면 됩니다

시작 후 다음 명령어로 확인 가능합니다:

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]#

context 소개 및 사용

사실 이게 컨텍스트 관리인데 컨텍스트의 역할은 주로 다음 두 가지 기능을 가지고 있습니다.

1. 고루틴의 타임아웃 제어

2. 컨텍스트 데이터 저장

다음의 간단한 예를 통해 이해해 보세요. :

package main

import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
)


type Result struct{
    r *http.Response
    err error
}

func process(){
    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr := &http.Transport{}
    client := &http.Client{Transport:tr}
    c := make(chan Result,1)
    req,err := http.NewRequest("GET","http://www.google.com",nil)
    if err != nil{
        fmt.Println("http request failed,err:",err)
        return
    }
    // 如果请求成功了会将数据存入到管道中
    go func(){
        resp,err := client.Do(req)
        pack := Result{resp,err}
        c <- pack
    }()

    select{
    case <- ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
    case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
    }
    return

}

func main() {
    process()
}

context를 통해 context를 저장하는 context를 작성합니다. 코드 예시는 다음과 같습니다.

package main

import (
    "github.com/Go-zh/net/context"
    "fmt"
)

func add(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    return a+b
}

func calc(ctx context.Context,a, b int) int{
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    //再将ctx传入到add中
    return add(ctx,a,b)
}

func main() {
    //将ctx传递到calc中
    ctx := context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,20,30)

}

etcd와 context를 함께 사용합니다

go를 통해 etcd를 연결하는 간단한 예시가 있습니다. 여기서 주목해야 할 작은 문제는 기본적으로 시작되는 etcd의 시작 방법입니다. 특히 가상으로 설치하는 경우 연결이 되지 않을 수 있으므로 다음 명령으로 시작해야 합니다.
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http: //0.0.0.0:2371 --listen-peer-urls http://0.0.0.0: 2381
)

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main() {
    cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }

    fmt.Println("connect success")
    defer cli.Close()
}

다음 예제는 etcd

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect succ")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    _,err = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil{
        fmt.Println("put failed,err",err)
        return
    }
    ctx, cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil{
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev := range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}
에 연결하여 값을 저장하고 가져오는 것입니다

package main

import (
    "context"
    "fmt"
)

func main() {
    // gen generates integers in a separate goroutine and
    // sends them to the returned channel.
    // The callers of gen need to cancel the context once
    // they are done consuming generated integers not to leak
    // the internal goroutine started by gen.
    gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return // returning not to leak the goroutine
                case dst <- n:
                    n++
                }
            }
        }()
        return dst
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // cancel when we are finished consuming integers

    for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
            break
        }
    }
}

컨텍스트 공식 웹사이트에도 예제가 있습니다. 매우 유용하며 열린 고루틴의 종료를 제어하는 ​​데 사용됩니다. 코드는 다음과 같습니다. :

package main


import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }

}

공식 홈페이지 문서에 있는 WithDeadline 데모의 코드 예시에 대해 :

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    defer cli.Close()
    // 这里会阻塞
    rch := cli.Watch(context.Background(),"logagent/conf/")
    for wresp := range rch{
        for _,ev := range wresp.Events{
            fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

위 코드를 보면 기본적인 사용법은 있고, etcd를 통해 하면 구성 관리, 구성이 변경되면 어떻게 알릴까요? 다음 예에서 볼 수 있듯이 구성 변경에 해당하는 서버:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            for msg := range pc.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
        }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()

}

kafka 소비자 코드 구현의 간단한 예:

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
)

var (
    wg sync.WaitGroup
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            wg.Add(1)
            for msg := range partitionConsumer.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
            wg.Done()
        }(pc)
    }

    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()

}

그러나 위 코드는 최상의 코드가 아닙니다. 왜냐하면 우리는 마침내 고루틴의 실행을 기다리기 때문입니다. time.sleep을 통해 sync.WaitGroup을 사용하도록 변경할 수 있습니다.

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
)

var Client *clientv3.Client
var logConfChan chan string


// 初始化etcd
func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

    var keys []string
    for _,ip := range ipArrays{
        //keyfmt = /logagent/%s/log_config
        keys = append(keys,fmt.Sprintf(keyfmt,ip))
    }

    logConfChan = make(chan string,10)
    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

    Client,err = clientv3.New(clientv3.Config{
        Endpoints:addr,
        DialTimeout: timeout,
    })
    if err != nil{
        logs.Error("connect failed,err:%v",err)
        return
    }
    logs.Debug("init etcd success")
    waitGroup.Add(1)
    for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        // 从etcd中获取要收集日志的信息
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil {
            logs.Warn("get key %s failed,err:%v",key,err)
            continue
        }

        for _, ev := range resp.Kvs{
            logs.Debug("%q : %q\n",  ev.Key, ev.Value)
            logConfChan <- string(ev.Value)
        }
    }
    go WatchEtcd(keys)
    return
}

func WatchEtcd(keys []string){
    // 这里用于检测当需要收集的日志信息更改时及时更新
    var watchChans []clientv3.WatchChan
    for _,key := range keys{
        rch := Client.Watch(context.Background(),key)
        watchChans = append(watchChans,rch)
    }

    for {
        for _,watchC := range watchChans{
            select{
            case wresp := <-watchC:
                for _,ev:= range wresp.Events{
                    logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
                    logConfChan <- string(ev.Kv.Value)
                }
            default:

            }
        }
        time.Sleep(time.Second)
    }
    waitGroup.Done()
}

func GetLogConf()chan string{
    return logConfChan
}
클라이언트가 수집해야 하는 로그 정보를 etcd

etcd 처리 코드는

package main

import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
)

type SecondLimit struct {
    unixSecond int64
    curCount int32
    limit int32
}

func NewSecondLimit(limit int32) *SecondLimit {
    secLimit := &SecondLimit{
        unixSecond:time.Now().Unix(),
        curCount:0,
        limit:limit,
    }
    return secLimit
}

func (s *SecondLimit) Add(count int) {
    sec := time.Now().Unix()
    if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount,int32(count))
        return
    }
    atomic.StoreInt64(&s.unixSecond,sec)
    atomic.StoreInt32(&s.curCount, int32(count))
}

func (s *SecondLimit) Wait()bool {
    for {
        sec := time.Now().Unix()
        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
            time.Sleep(time.Microsecond)
            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
            continue
        }

        if sec != atomic.LoadInt64(&s.unixSecond) {
            atomic.StoreInt64(&s.unixSecond,sec)
            atomic.StoreInt32(&s.curCount,0)
        }
        logs.Debug("limit is exited")
        return false
    }
}

마찬가지로 속도 제한입니다. 결국 여기에 처리가 추가되므로 로그 수집 프로그램은 현재 비즈니스의 성능에 영향을 미칠 수 없으므로 속도를 제한하기 위해 Limit.go가 추가됩니다.

rrreee추천: go 언어 튜토리얼

🎜

위 내용은 Go 언어로 구현된 로그 수집 시스템에 대한 자세한 그래픽 및 텍스트 설명의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 cnblogs.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제