


Detailed graphic and text explanation of log collection system implemented in go language
I have sorted out the frame of this log collection system, as shown below
The overall logic of the code to be implemented this time is:
The complete code address is: https://github.com/pythonsite/logagent
etcd introduction
Highly available distributed Key-value storage can be used for configuration sharing and service discovery
Similar projects: zookeeper and consul
Development language: go
Interface: Provide restful interface, use Simple
Implementation algorithm: strong consistency based on raft algorithm, highly available service storage directory
application scenarios of etcd:
1. Service discovery and service registration
2. Configuration center (need to be used by the log collection client we implemented)
3. Distributed lock
4. Master election
Official website for etcd There is a very concise introduction:
Download address: https://github.com/coreos/etcd/releases/
Based on Just download the corresponding version in your own environment and start it.
[root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl set name zhaofan zhaofan [root@localhost etcd-v3.2.18-linux-amd64]# ./etcdctl get name zhaofan [root@localhost etcd-v3.2.18-linux-amd64]#
context Introduction and use
In fact, this thing is translated as context management. So what is the role of context? It mainly has the following two functions: 1. Control the timeout of goroutine2 , Save context data Understand through the following simple example:package main import ( "fmt" "time" "net/http" "context" "io/ioutil" ) type Result struct{ r *http.Response err error } func process(){ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) defer cancel() tr := &http.Transport{} client := &http.Client{Transport:tr} c := make(chan Result,1) req,err := http.NewRequest("GET","http://www.google.com",nil) if err != nil{ fmt.Println("http request failed,err:",err) return } // 如果请求成功了会将数据存入到管道中 go func(){ resp,err := client.Do(req) pack := Result{resp,err} c <- pack }() select{ case <- ctx.Done(): tr.CancelRequest(req) fmt.Println("timeout!") case res := <-c: defer res.r.Body.Close() out,_:= ioutil.ReadAll(res.r.Body) fmt.Printf("server response:%s",out) } return } func main() { process() }Write a context to save context through context, code example is:
package main import ( "github.com/Go-zh/net/context" "fmt" ) func add(ctx context.Context,a,b int) int { traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) return a+b } func calc(ctx context.Context,a, b int) int{ traceId := ctx.Value("trace_id").(string) fmt.Printf("trace_id:%v\n",traceId) //再将ctx传入到add中 return add(ctx,a,b) } func main() { //将ctx传递到calc中 ctx := context.WithValue(context.Background(),"trace_id","123456") calc(ctx,20,30) }
Combining etcd and context
A simple example of connecting etcd through go: (There is a small problem that needs attention here is the startup method of etcd. The default startup may not connect, especially if you It is installed virtually, so you need to start it with the following command:./etcd --listen-client-urls http://0.0.0.0:2371 --advertise-client-urls http://0.0.0.0: 2371 --listen-peer-urls http://0.0.0.0:2381
)
package main import ( etcd_client "github.com/coreos/etcd/clientv3" "time" "fmt" ) func main() { cli, err := etcd_client.New(etcd_client.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect success") defer cli.Close() }The following example is to connect etcd, save and get the value
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil{ fmt.Println("connect failed,err:",err) return } fmt.Println("connect succ") defer cli.Close() ctx,cancel := context.WithTimeout(context.Background(),time.Second) _,err = cli.Put(ctx,"logagent/conf/","sample_value") cancel() if err != nil{ fmt.Println("put failed,err",err) return } ctx, cancel = context.WithTimeout(context.Background(),time.Second) resp,err := cli.Get(ctx,"logagent/conf/") cancel() if err != nil{ fmt.Println("get failed,err:",err) return } for _,ev := range resp.Kvs{ fmt.Printf("%s:%s\n",ev.Key,ev.Value) } }About context The official website also has a very useful example, which is used to control the exit of an opened goroutine. The code is as follows:
package main import ( "context" "fmt" ) func main() { // gen generates integers in a separate goroutine and // sends them to the returned channel. // The callers of gen need to cancel the context once // they are done consuming generated integers not to leak // the internal goroutine started by gen. gen := func(ctx context.Context) <-chan int { dst := make(chan int) n := 1 go func() { for { select { case <-ctx.Done(): return // returning not to leak the goroutine case dst <- n: n++ } } }() return dst } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // cancel when we are finished consuming integers for n := range gen(ctx) { fmt.Println(n) if n == 5 { break } } }Regarding the code example of the WithDeadline demonstration in the official website document:
package main import ( "context" "fmt" "time" ) func main() { d := time.Now().Add(50 * time.Millisecond) ctx, cancel := context.WithDeadline(context.Background(), d) // Even though ctx will be expired, it is good practice to call its // cancelation function in any case. Failure to do so may keep the // context and its parent alive longer than necessary. defer cancel() select { case <-time.After(1 * time.Second): fmt.Println("overslept") case <-ctx.Done(): fmt.Println(ctx.Err()) } }Through the above code, we have A basic use, then if we use etcd for configuration management, if the configuration changes, how do we notify the corresponding server of the configuration change? The following example demonstrates:
package main import ( "github.com/coreos/etcd/clientv3" "time" "fmt" "context" ) func main() { cli,err := clientv3.New(clientv3.Config{ Endpoints:[]string{"192.168.0.118:2371"}, DialTimeout:5*time.Second, }) if err != nil { fmt.Println("connect failed,err:",err) return } defer cli.Close() // 这里会阻塞 rch := cli.Watch(context.Background(),"logagent/conf/") for wresp := range rch{ for _,ev := range wresp.Events{ fmt.Printf("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) } } }Simple implementation of a kafka consumer code Example:
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "time" ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ for msg := range pc.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } }(pc) } time.Sleep(time.Hour) consumer.Close() }But the above code is not the best code because we end up waiting for the execution of goroutine through time.sleep. We can change it to implement it through sync.WaitGroup
package main import ( "github.com/Shopify/sarama" "strings" "fmt" "sync" ) var ( wg sync.WaitGroup ) func main() { consumer,err := sarama.NewConsumer(strings.Split("192.168.0.118:9092",","),nil) if err != nil{ fmt.Println("failed to start consumer:",err) return } partitionList,err := consumer.Partitions("nginx_log") if err != nil { fmt.Println("Failed to get the list of partitions:",err) return } fmt.Println(partitionList) for partition := range partitionList{ pc,err := consumer.ConsumePartition("nginx_log",int32(partition),sarama.OffsetNewest) if err != nil { fmt.Printf("failed to start consumer for partition %d:%s\n",partition,err) return } defer pc.AsyncClose() go func(partitionConsumer sarama.PartitionConsumer){ wg.Add(1) for msg := range partitionConsumer.Messages(){ fmt.Printf("partition:%d Offset:%d Key:%s Value:%s",msg.Partition,msg.Offset,string(msg.Key),string(msg.Value)) } wg.Done() }(pc) } //time.Sleep(time.Hour) wg.Wait() consumer.Close() }
Put the log information that the client needs to collect into etcd
The code for etcd processing is:package main import ( "github.com/coreos/etcd/clientv3" "time" "github.com/astaxie/beego/logs" "context" "fmt" ) var Client *clientv3.Client var logConfChan chan string // 初始化etcd func initEtcd(addr []string,keyfmt string,timeout time.Duration)(err error){ var keys []string for _,ip := range ipArrays{ //keyfmt = /logagent/%s/log_config keys = append(keys,fmt.Sprintf(keyfmt,ip)) } logConfChan = make(chan string,10) logs.Debug("etcd watch key:%v timeout:%v", keys, timeout) Client,err = clientv3.New(clientv3.Config{ Endpoints:addr, DialTimeout: timeout, }) if err != nil{ logs.Error("connect failed,err:%v",err) return } logs.Debug("init etcd success") waitGroup.Add(1) for _, key := range keys{ ctx,cancel := context.WithTimeout(context.Background(),2*time.Second) // 从etcd中获取要收集日志的信息 resp,err := Client.Get(ctx,key) cancel() if err != nil { logs.Warn("get key %s failed,err:%v",key,err) continue } for _, ev := range resp.Kvs{ logs.Debug("%q : %q\n", ev.Key, ev.Value) logConfChan <- string(ev.Value) } } go WatchEtcd(keys) return } func WatchEtcd(keys []string){ // 这里用于检测当需要收集的日志信息更改时及时更新 var watchChans []clientv3.WatchChan for _,key := range keys{ rch := Client.Watch(context.Background(),key) watchChans = append(watchChans,rch) } for { for _,watchC := range watchChans{ select{ case wresp := <-watchC: for _,ev:= range wresp.Events{ logs.Debug("%s %q : %q\n", ev.Type, ev.Kv.Key, ev.Kv.Value) logConfChan <- string(ev.Kv.Value) } default: } } time.Sleep(time.Second) } waitGroup.Done() } func GetLogConf()chan string{ return logConfChan }Similarly, add the right limit here After all, the log collection program cannot affect the performance of the current business, so limit.go is added to limit the speed:
package main import ( "time" "sync/atomic" "github.com/astaxie/beego/logs" ) type SecondLimit struct { unixSecond int64 curCount int32 limit int32 } func NewSecondLimit(limit int32) *SecondLimit { secLimit := &SecondLimit{ unixSecond:time.Now().Unix(), curCount:0, limit:limit, } return secLimit } func (s *SecondLimit) Add(count int) { sec := time.Now().Unix() if sec == s.unixSecond { atomic.AddInt32(&s.curCount,int32(count)) return } atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount, int32(count)) } func (s *SecondLimit) Wait()bool { for { sec := time.Now().Unix() if (sec == atomic.LoadInt64(&s.unixSecond)) && s.curCount == s.limit { time.Sleep(time.Microsecond) logs.Debug("limit is running,limit:%d s.curCount:%d",s.limit,s.curCount) continue } if sec != atomic.LoadInt64(&s.unixSecond) { atomic.StoreInt64(&s.unixSecond,sec) atomic.StoreInt32(&s.curCount,0) } logs.Debug("limit is exited") return false } }Recommended:
The above is the detailed content of Detailed graphic and text explanation of log collection system implemented in go language. For more information, please follow other related articles on the PHP Chinese website!

You should care about the "strings" package in Go because it provides tools for handling text data, splicing from basic strings to advanced regular expression matching. 1) The "strings" package provides efficient string operations, such as Join functions used to splice strings to avoid performance problems. 2) It contains advanced functions, such as the ContainsAny function, to check whether a string contains a specific character set. 3) The Replace function is used to replace substrings in a string, and attention should be paid to the replacement order and case sensitivity. 4) The Split function can split strings according to the separator and is often used for regular expression processing. 5) Performance needs to be considered when using, such as

The"encoding/binary"packageinGoisessentialforhandlingbinarydata,offeringtoolsforreadingandwritingbinarydataefficiently.1)Itsupportsbothlittle-endianandbig-endianbyteorders,crucialforcross-systemcompatibility.2)Thepackageallowsworkingwithcus

Mastering the bytes package in Go can help improve the efficiency and elegance of your code. 1) The bytes package is crucial for parsing binary data, processing network protocols, and memory management. 2) Use bytes.Buffer to gradually build byte slices. 3) The bytes package provides the functions of searching, replacing and segmenting byte slices. 4) The bytes.Reader type is suitable for reading data from byte slices, especially in I/O operations. 5) The bytes package works in collaboration with Go's garbage collector, improving the efficiency of big data processing.

You can use the "strings" package in Go to manipulate strings. 1) Use strings.TrimSpace to remove whitespace characters at both ends of the string. 2) Use strings.Split to split the string into slices according to the specified delimiter. 3) Merge string slices into one string through strings.Join. 4) Use strings.Contains to check whether the string contains a specific substring. 5) Use strings.ReplaceAll to perform global replacement. Pay attention to performance and potential pitfalls when using it.

ThebytespackageinGoishighlyeffectiveforbyteslicemanipulation,offeringfunctionsforsearching,splitting,joining,andbuffering.1)Usebytes.Containstosearchforbytesequences.2)bytes.Splithelpsbreakdownbyteslicesusingdelimiters.3)bytes.Joinreconstructsbytesli

ThealternativestoGo'sbytespackageincludethestringspackage,bufiopackage,andcustomstructs.1)Thestringspackagecanbeusedforbytemanipulationbyconvertingbytestostringsandback.2)Thebufiopackageisidealforhandlinglargestreamsofbytedataefficiently.3)Customstru

The"bytes"packageinGoisessentialforefficientlymanipulatingbyteslices,crucialforbinarydata,networkprotocols,andfileI/O.ItoffersfunctionslikeIndexforsearching,Bufferforhandlinglargedatasets,Readerforsimulatingstreamreading,andJoinforefficient

Go'sstringspackageiscrucialforefficientstringmanipulation,offeringtoolslikestrings.Split(),strings.Join(),strings.ReplaceAll(),andstrings.Contains().1)strings.Split()dividesastringintosubstrings;2)strings.Join()combinesslicesintoastring;3)strings.Rep


Hot AI Tools

Undresser.AI Undress
AI-powered app for creating realistic nude photos

AI Clothes Remover
Online AI tool for removing clothes from photos.

Undress AI Tool
Undress images for free

Clothoff.io
AI clothes remover

Video Face Swap
Swap faces in any video effortlessly with our completely free AI face swap tool!

Hot Article

Hot Tools

SublimeText3 Chinese version
Chinese version, very easy to use

mPDF
mPDF is a PHP library that can generate PDF files from UTF-8 encoded HTML. The original author, Ian Back, wrote mPDF to output PDF files "on the fly" from his website and handle different languages. It is slower than original scripts like HTML2FPDF and produces larger files when using Unicode fonts, but supports CSS styles etc. and has a lot of enhancements. Supports almost all languages, including RTL (Arabic and Hebrew) and CJK (Chinese, Japanese and Korean). Supports nested block-level elements (such as P, DIV),

SecLists
SecLists is the ultimate security tester's companion. It is a collection of various types of lists that are frequently used during security assessments, all in one place. SecLists helps make security testing more efficient and productive by conveniently providing all the lists a security tester might need. List types include usernames, passwords, URLs, fuzzing payloads, sensitive data patterns, web shells, and more. The tester can simply pull this repository onto a new test machine and he will have access to every type of list he needs.

MinGW - Minimalist GNU for Windows
This project is in the process of being migrated to osdn.net/projects/mingw, you can continue to follow us there. MinGW: A native Windows port of the GNU Compiler Collection (GCC), freely distributable import libraries and header files for building native Windows applications; includes extensions to the MSVC runtime to support C99 functionality. All MinGW software can run on 64-bit Windows platforms.

SAP NetWeaver Server Adapter for Eclipse
Integrate Eclipse with SAP NetWeaver application server.
