整理了一下這個日誌收集系統的框,如下圖
#這次要實現的程式碼的整體邏輯為:
#完整程式碼位址為: https://github.com/pythonsite/logagent
etcd介紹
高可用的分散式key-value存儲,可以用於配置共享和服務發現
類似的項目:zookeeper和consul
開發語言:go
接口:提供restful的接口,使用簡單
實作演算法:基於raft演算法的強一致性,高可用的服務儲存目錄
etcd的應用程式場景:
1、服務發現和服務註冊
2、設定中心(我們實作的日誌收集客戶端需要用到)
3、分散式鎖定
4、master選舉
##官網對etcd的有一個非常簡潔的介紹: etcd建置:下載網址:https://github.com/coreos/etcd/releases/
根據自己的環境下載對應的版本然後啟動起來就可以了
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name zhaofan [root@localhost etcd-v3.2.18-linux-amd64]#
context 介紹和使用
其實這個東西翻譯過來就是上下文管理,那麼context的作用是做什麼,主要有如下兩個作用:1、控制goroutine的超時2 、保存上下文資料透過下面一個簡單的例子來理解:package main import ( "fmt" "time" "net/http" "context" "io/ioutil" ) type Result struct{ r *http.Response err error } func process(){ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) defer cancel() tr := &http.Transport{} client := &http.Client{Transport:tr} c := make(chan Result,1) req,err := http.NewRequest("GET","http://www.google.com",nil) if err != nil{ fmt.Println("http request failed,err:",err) return } // 如果请求成功了会将数据存入到管道中 go func(){ resp,err := client.Do(req) pack := Result{resp,err} c <- pack }() select{ case <- ctx.Done(): tr.CancelRequest(req) fmt.Println("timeout!") case res := <-c: defer res.r.Body.Close() out,_:= ioutil.ReadAll(res.r.Body) fmt.Printf("server response:%s",out) } return } func main() { process() }寫一個透過context保存上下文,程式碼範例如:
package main import ( "github.com/Go-zh/net/context" "fmt" ) func add(ctx context.Context,a,b int) int { traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) return a+b } func calc(ctx context.Context,a, b int) int{ traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) //再将ctx传入到add中 return add(ctx,a,b) } func main() { //将ctx传递到calc中 ctx := context.WithValue(context.Background(),"trace_id","123456") calc(ctx,20,30) }
結合etcd和context使用
關於透過go連接etcd的簡單例子:(這裡有個小問題需要注意就是etcd的啟動方式,預設啟動可能會連接不上,尤其你是在虛擬你安裝,所以需要透過以下指令啟動:./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0: 2371 --listen-peer-urls http://0.0.0.0:2381
)
package main import ( etcd_client "github.com/coreos/etcd/clientv3" "time" "fmt" ) func main() { cli, err := etcd_client.New(etcd_client.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect success") defer cli.Close() }下面一個例子是透過連接etcd,存值並取值
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect succ") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) _,err = cli.Put(ctx,"logagent/conf/","sample_value") cancel() if err != nil{ fmt.Println("put failed,err",err) return } ctx, cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"logagent/conf/") cancel() if err != nil{ fmt.Println("get failed,err:",err) return } for _,ev := range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) } }關於context官網也有一個例子非常有用,用於控制開啟的goroutine的退出,程式碼如下:
package main import ( "context" "fmt" ) func main() { // gen generates integers in a separate goroutine and // sends them to the returned channel. // The callers of gen need to cancel the context once // they are done consuming generated integers not to leak // the internal goroutine started by gen. gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done(): return // returning not to leak the goroutine case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel when we are finished consuming integers for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } }關於官網文檔中的WithDeadline演示的程式碼範例:
package main import ( "context" "fmt" "time" ) func main() { d := time.Now().Add(50 * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its // cancelation function in any case. Failure to do so may keep the // context and its parent alive longer than necessary. defer cancel() select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) } }透過上面的程式碼有了一個基本的使用,那麼如果我們透過etcd來做配置管理,如果配置更改之後,我們如何通知對應的伺服器配置更改,透過下面例子演示:
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil { fmt.Println("connect failed,err:",err) return } defer cli.Close() // 这里会阻塞 rch := cli.Watch(context.Background(),"logagent/conf/") for wresp := range rch{ for _,ev := range wresp.Events{ fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }實現一個kafka的消費者代碼的簡單範例:
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "time" ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } }(pc) } time.Sleep(time.Hour) consumer.Close() }但是上面的程式碼並不是最佳程式碼,因為我們最後是透過time.sleep等待goroutine的執行,我們可以更改為透過sync.WaitGroup方式實作
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "sync" ) var ( wg sync.WaitGroup ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ wg.Add(1) for msg := range partitionConsumer.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } wg.Done() }(pc) } //time.Sleep(time.Hour) wg.Wait() consumer.Close() }
將客戶端需要收集的日誌資訊放到etcd中
關於etcd處理的程式碼為:package main import ( "github.com/coreos/etcd/clientv3" "time" "github.com/astaxie/beego/logs" "context" "fmt" ) var Client *clientv3.Client var logConfChan chan string // 初始化etcd func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){ var keys []string for _,ip := range ipArrays{ //keyfmt = /logagent/%s/log_config keys = append(keys,fmt.Sprintf(keyfmt,ip)) } logConfChan = make(chan string,10) logs.Debug("etcd watch key:%v timeout:%v", keys, timeout) Client,err = clientv3.New(clientv3.Config{ Endpoints:addr, DialTimeout: timeout, }) if err != nil{ logs.Error("connect failed,err:%v",err) return } logs.Debug("init etcd success") waitGroup.Add(1) for _, key := range keys{ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) // 从etcd中获取要收集日志的信息 resp,err := Client.Get(ctx,key) cancel() if err != nil { logs.Warn("get key %s failed,err:%v",key,err) continue } for _, ev := range resp.Kvs{ logs.Debug("%q : %q\n", ev.Key, ev.Value) logConfChan <- string(ev.Value) } } go WatchEtcd(keys) return } func WatchEtcd(keys []string){ // 这里用于检测当需要收集的日志信息更改时及时更新 var watchChans []clientv3.WatchChan for _,key := range keys{ rch := Client.Watch(context.Background(),key) watchChans = append(watchChans,rch) } for { for _,watchC := range watchChans{ select{ case wresp := <-watchC: for _,ev:= range wresp.Events{ logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) logConfChan <- string(ev.Kv.Value) } default: } } time.Sleep(time.Second) } waitGroup.Done() } func GetLogConf()chan string{ return logConfChan }同樣的這裡增加對了限速的處理,畢竟日誌收集程式不能影響了目前業務的效能,所以增加了limit.go用於限制速度:
package main import ( "time" "sync/atomic" "github.com/astaxie/beego/logs" ) type SecondLimit struct { unixSecond int64 curCount int32 limit int32 } func NewSecondLimit(limit int32) *SecondLimit { secLimit := &SecondLimit{ unixSecond:time.Now().Unix(), curCount:0, limit:limit, } return secLimit } func (s *SecondLimit) Add(count int) { sec := time.Now().Unix() if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)) return } atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount, int32(count)) } func (s *SecondLimit) Wait()bool { for { sec := time.Now().Unix() if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit { time.Sleep(time.Microsecond) logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount) continue } if sec != atomic.LoadInt64(&s.unixSecond) { atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount,0) } logs.Debug("limit is exited") return false } }推薦:
以上是go語言實作日誌收集系統圖文詳解的詳細內容。更多資訊請關注PHP中文網其他相關文章!