Maison >développement back-end >Golang >Explication graphique et textuelle détaillée du système de collecte de journaux implémenté en langage Go

Explication graphique et textuelle détaillée du système de collecte de journaux implémenté en langage Go

尚
avant
2019-11-27 15:47:354091parcourir

Explication graphique et textuelle détaillée du système de collecte de journaux implémenté en langage Go

J'ai réglé le cadre de ce système de collecte de journaux, comme indiqué ci-dessous

Explication graphique et textuelle détaillée du système de collecte de journaux implémenté en langage GoLa logique globale du code à implémenter cette fois est :

Explication graphique et textuelle détaillée du système de collecte de journaux implémenté en langage GoL'adresse complète du code est : https://github.com/pythonsite/logagent

introduction etcd

Le stockage clé-valeur distribué hautement disponible peut être utilisé pour le partage de configuration et la découverte de services

Projets similaires : zookeeper et consul

Langage de développement : go

Interface : fournit une interface reposante , utilisez un simple

algorithme de mise en œuvre : forte cohérence basée sur l'algorithme radeau, répertoire de stockage de services hautement disponible

scénarios d'application etcd :

Découverte de services et enregistrement de services

.

2. Centre de configuration (obligatoire pour le client de collecte de journaux que nous avons implémenté)

3. Verrouillage distribué

4. Élection du maître

Site officiel pour etcd Il existe un. introduction très concise :

Explication graphique et textuelle détaillée du système de collecte de journaux implémenté en langage Go

etcd build :
Adresse de téléchargement : https://github.com/coreos/etcd/releases/
Selon Just download the version correspondante dans votre propre environnement et démarrez-la

Après le démarrage, vous pouvez la vérifier avec la commande suivante :

[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan 

zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name
zhaofan
[root@localhost etcd-v3.2.18-linux-amd64]#

introduction du contexte et utilisation

En fait, cette chose se traduit par gestion du contexte. Alors quel est le rôle du contexte ? Il a principalement les deux fonctions suivantes :

1. Contrôler le délai d'attente de la goroutine

2, Enregistrer les données de contexte

Comprenez à travers l'exemple simple suivant :

package main

import (
    "fmt"
    "time"
    "net/http"
    "context"
    "io/ioutil"
)


type Result struct{
    r *http.Response
    err error
}

func process(){
    ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
    defer cancel()
    tr := &http.Transport{}
    client := &http.Client{Transport:tr}
    c := make(chan Result,1)
    req,err := http.NewRequest("GET","http://www.google.com",nil)
    if err != nil{
        fmt.Println("http request failed,err:",err)
        return
    }
    // 如果请求成功了会将数据存入到管道中
    go func(){
        resp,err := client.Do(req)
        pack := Result{resp,err}
        c <- pack
    }()

    select{
    case <- ctx.Done():
        tr.CancelRequest(req)
        fmt.Println("timeout!")
    case res := <-c:
        defer res.r.Body.Close()
        out,_:= ioutil.ReadAll(res.r.Body)
        fmt.Printf("server response:%s",out)
    }
    return

}

func main() {
    process()
}

Écrivez un contexte pour enregistrer le contexte via le contexte, des exemples de code tels que :

package main

import (
    "github.com/Go-zh/net/context"
    "fmt"
)

func add(ctx context.Context,a,b int) int {
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    return a+b
}

func calc(ctx context.Context,a, b int) int{
    traceId := ctx.Value("trace_id").(string)
    fmt.Printf("trace_id:%v\n",traceId)
    //再将ctx传入到add中
    return add(ctx,a,b)
}

func main() {
    //将ctx传递到calc中
    ctx := context.WithValue(context.Background(),"trace_id","123456")
    calc(ctx,20,30)

}

Combinaison d'etcd et de contexte

Un exemple simple de connexion d'etcd via go : (Il y a un petit problème qui nécessite une attention ici est la méthode de démarrage d'etcd. Le démarrage par défaut peut ne vous connectez pas, surtout si vous Il est installé virtuellement, vous devez donc le démarrer avec la commande suivante :
./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client- urls http://0.0.0.0 : 2371 --listen-peer-urls http://0.0.0.0:2381
)

package main

import (
    etcd_client "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
)

func main() {
    cli, err := etcd_client.New(etcd_client.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }

    fmt.Println("connect success")
    defer cli.Close()
}

L'exemple suivant consiste à connecter etcd, à enregistrer et à obtenir la valeur

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil{
        fmt.Println("connect failed,err:",err)
        return
    }
    fmt.Println("connect succ")
    defer cli.Close()
    ctx,cancel := context.WithTimeout(context.Background(),time.Second)
    _,err = cli.Put(ctx,"logagent/conf/","sample_value")
    cancel()
    if err != nil{
        fmt.Println("put failed,err",err)
        return
    }
    ctx, cancel = context.WithTimeout(context.Background(),time.Second)
    resp,err := cli.Get(ctx,"logagent/conf/")
    cancel()
    if err != nil{
        fmt.Println("get failed,err:",err)
        return
    }
    for _,ev := range resp.Kvs{
        fmt.Printf("%s:%s\n",ev.Key,ev.Value)
    }
}

À propos du contexte Le site officiel propose également un exemple très utile, qui est utilisé pour contrôler la sortie d'une goroutine ouverte. Le code est le suivant :

package main

import (
    "context"
    "fmt"
)

func main() {
    // gen generates integers in a separate goroutine and
    // sends them to the returned channel.
    // The callers of gen need to cancel the context once
    // they are done consuming generated integers not to leak
    // the internal goroutine started by gen.
    gen := func(ctx context.Context) <-chan int {
        dst := make(chan int)
        n := 1
        go func() {
            for {
                select {
                case <-ctx.Done():
                    return // returning not to leak the goroutine
                case dst <- n:
                    n++
                }
            }
        }()
        return dst
    }

    ctx, cancel := context.WithCancel(context.Background())
    defer cancel() // cancel when we are finished consuming integers

    for n := range gen(ctx) {
        fmt.Println(n)
        if n == 5 {
            break
        }
    }
}

Concernant l'exemple de code du. Démonstration WithDeadline dans le document du site officiel :

package main


import (
    "context"
    "fmt"
    "time"
)

func main() {
    d := time.Now().Add(50 * time.Millisecond)
    ctx, cancel := context.WithDeadline(context.Background(), d)

    // Even though ctx will be expired, it is good practice to call its
    // cancelation function in any case. Failure to do so may keep the
    // context and its parent alive longer than necessary.
    defer cancel()

    select {
    case <-time.After(1 * time.Second):
        fmt.Println("overslept")
    case <-ctx.Done():
        fmt.Println(ctx.Err())
    }

}

Grâce au code ci-dessus, nous avons Une utilisation de base, alors si nous utilisons etcd pour la gestion de la configuration, si la configuration change, comment notifier les changements de configuration du serveur correspondants ? L'exemple suivant démontre :

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "fmt"
    "context"
)

func main() {
    cli,err := clientv3.New(clientv3.Config{
        Endpoints:[]string{"192.168.0.118:2371"},
        DialTimeout:5*time.Second,
    })
    if err != nil {
        fmt.Println("connect failed,err:",err)
        return
    }
    defer cli.Close()
    // 这里会阻塞
    rch := cli.Watch(context.Background(),"logagent/conf/")
    for wresp := range rch{
        for _,ev := range wresp.Events{
            fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
        }
    }
}

Implémentation simple d'un code consommateur kafka Exemple :

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "time"
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            for msg := range pc.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
        }(pc)
    }
    time.Sleep(time.Hour)
    consumer.Close()

}

Cependant, le code ci-dessus n'est pas le meilleur code car on finit par attendre l'exécution de goroutine au fil du temps .sleep. Nous pouvons le modifier pour utiliser sync.WaitGroup pour implémenter

package main

import (
    "github.com/Shopify/sarama"
    "strings"
    "fmt"
    "sync"
)

var (
    wg sync.WaitGroup
)

func main() {
    consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil)
    if err != nil{
        fmt.Println("failed to start consumer:",err)
        return
    }
    partitionList,err := consumer.Partitions("nginx_log")
    if err != nil {
        fmt.Println("Failed to get the list of partitions:",err)
        return
    }
    fmt.Println(partitionList)
    for partition := range partitionList{
        pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest)
        if err != nil {
            fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err)
            return
        }
        defer pc.AsyncClose()
        go func(partitionConsumer sarama.PartitionConsumer){
            wg.Add(1)
            for msg := range partitionConsumer.Messages(){
                fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value))
            }
            wg.Done()
        }(pc)
    }

    //time.Sleep(time.Hour)
    wg.Wait()
    consumer.Close()

}

Mettez les informations de journal que le client doit collecter dans etcd

Le code pour le traitement etcd est :

package main

import (
    "github.com/coreos/etcd/clientv3"
    "time"
    "github.com/astaxie/beego/logs"
    "context"
    "fmt"
)

var Client *clientv3.Client
var logConfChan chan string


// 初始化etcd
func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){

    var keys []string
    for _,ip := range ipArrays{
        //keyfmt = /logagent/%s/log_config
        keys = append(keys,fmt.Sprintf(keyfmt,ip))
    }

    logConfChan = make(chan string,10)
    logs.Debug("etcd watch key:%v timeout:%v", keys, timeout)

    Client,err = clientv3.New(clientv3.Config{
        Endpoints:addr,
        DialTimeout: timeout,
    })
    if err != nil{
        logs.Error("connect failed,err:%v",err)
        return
    }
    logs.Debug("init etcd success")
    waitGroup.Add(1)
    for _, key := range keys{
        ctx,cancel := context.WithTimeout(context.Background(),2*time.Second)
        // 从etcd中获取要收集日志的信息
        resp,err := Client.Get(ctx,key)
        cancel()
        if err != nil {
            logs.Warn("get key %s failed,err:%v",key,err)
            continue
        }

        for _, ev := range resp.Kvs{
            logs.Debug("%q : %q\n",  ev.Key, ev.Value)
            logConfChan <- string(ev.Value)
        }
    }
    go WatchEtcd(keys)
    return
}

func WatchEtcd(keys []string){
    // 这里用于检测当需要收集的日志信息更改时及时更新
    var watchChans []clientv3.WatchChan
    for _,key := range keys{
        rch := Client.Watch(context.Background(),key)
        watchChans = append(watchChans,rch)
    }

    for {
        for _,watchC := range watchChans{
            select{
            case wresp := <-watchC:
                for _,ev:= range wresp.Events{
                    logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value)
                    logConfChan <- string(ev.Kv.Value)
                }
            default:

            }
        }
        time.Sleep(time.Second)
    }
    waitGroup.Done()
}

func GetLogConf()chan string{
    return logConfChan
}

De même, ajoutez la bonne limite ici. Après tout, le programme de collecte de journaux ne peut pas affecter les performances de l'activité en cours, donc limit.go est ajouté pour limiter la vitesse :

package main

import (
    "time"
    "sync/atomic"
    "github.com/astaxie/beego/logs"
)

type SecondLimit struct {
    unixSecond int64
    curCount int32
    limit int32
}

func NewSecondLimit(limit int32) *SecondLimit {
    secLimit := &SecondLimit{
        unixSecond:time.Now().Unix(),
        curCount:0,
        limit:limit,
    }
    return secLimit
}

func (s *SecondLimit) Add(count int) {
    sec := time.Now().Unix()
    if sec == s.unixSecond {
        atomic.AddInt32(&s.curCount,int32(count))
        return
    }
    atomic.StoreInt64(&s.unixSecond,sec)
    atomic.StoreInt32(&s.curCount, int32(count))
}

func (s *SecondLimit) Wait()bool {
    for {
        sec := time.Now().Unix()
        if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit {
            time.Sleep(time.Microsecond)
            logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount)
            continue
        }

        if sec != atomic.LoadInt64(&s.unixSecond) {
            atomic.StoreInt64(&s.unixSecond,sec)
            atomic.StoreInt32(&s.curCount,0)
        }
        logs.Debug("limit is exited")
        return false
    }
}

Recommandé : tutoriel de langue go

Ce qui précède est le contenu détaillé de. pour plus d'informations, suivez d'autres articles connexes sur le site Web de PHP en chinois!

Déclaration:
Cet article est reproduit dans:. en cas de violation, veuillez contacter admin@php.cn Supprimer