02 . Go开发一个日志收集平台之Context及etcd简单使用

Context简单使用

context设置,获取value值

应用于全局通用参数传递

package main

import (
	"context"
	"fmt"
)

func process(ctx context.Context) {
	ret,ok := ctx.Value("trace_id").(int)
	if !ok {
		ret = 1234
	}

	fmt.Printf("ret:%d\n", ret)

	s , _ := ctx.Value("session").(string)
	fmt.Printf("session:%s\n", s)
}

func main() {
	ctx := context.WithValue(context.Background(), "trace_id", 1314)
	ctx = context.WithValue(ctx, "session", "sdlkfjkaslfsalfsafjalskfj")
	process(ctx)
}
超时控制
package main

import (
	"context"
	"fmt"
	"io/ioutil"
	"net/http"
	"time"
)
type Result struct {
	r   *http.Response
	err error
}
func process() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()
	tr := &http.Transport{}
	client := &http.Client{Transport: tr}
	c := make(chan Result, 1)
	req, err := http.NewRequest("GET", "//www.google.com", nil)
	if err != nil {
		fmt.Println("http request failed, err:", err)
		return
	}
	go func() {
		resp, err := client.Do(req)
		pack := Result{r: resp, err: err}
		c <- pack
	}()
	select {
	case <-ctx.Done():
		tr.CancelRequest(req)
		res := <-c
		fmt.Println("Timeout! err:", res.err)
	case res := <-c:
		defer res.r.Body.Close()
		out, _ := ioutil.ReadAll(res.r.Body)
		fmt.Printf("Server Response: %s", out)
	}
	return
}
func main() {
	process()
}

Etcd使用

部署及原理请看我前面写的文章

//www.cnblogs.com/you-men/p/13570241.html

安装go使用etcd的包
go get github.com/coreos/etcd

// 因为etcd依赖的包变更,导致不能运行,所以需要修改 go.mod 文件让 ETCD 跑起来。go.mod 文件如下:

module test

go 1.13

require (
	github.com/coreos/etcd v3.3.22+incompatible
	github.com/coreos/go-semver v0.3.0 // indirect
	github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
	//github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf // indirect
	github.com/coreos/go-systemd/v22 v22.1.0
	github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
	github.com/gogo/protobuf v1.3.1 // indirect
	github.com/google/uuid v1.1.1 // indirect
	github.com/gorhill/cronexpr v0.0.0-20180427100037-88b0669f7d75
	go.etcd.io/bbolt v1.3.4 // indirect
	go.etcd.io/etcd v3.3.22+incompatible
	go.uber.org/zap v1.15.0 // indirect
	//google.golang.org/grpc v1.29.1 // indirect
	google.golang.org/grpc v1.26.0
)

创建etcd链接
package main

import (
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main() {

	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"192.168.43.233:2379", "192.168.43.130:22379", "192.168.43.246:32379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		fmt.Println("connect failed, err:", err)
		return
	}

	fmt.Println("connect success")
	defer cli.Close()
}
PUT/GET
package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main() {

	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"192.168.43.233:2379", "192.168.43.130:22379", "192.168.43.246:32379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		fmt.Println("connect failed, err:", err)
		return
	}
	//ctx,cancel := context.WithTimeout(context.Background(),time.Second)
	//_,err = cli.Put(ctx,"/logagent/conf/","sample_value")
	//cancel()
	//if err != nil{
	//	fmt.Println("put faile,err:",err)
	//	return
	//}
	//
	//ctx,cancel = context.WithTimeout(context.Background(),time.Second)
	//resp,err := cli.Get(ctx,"/logagent/conf")
	//cancel()
	//if err != nil{
	//	fmt.Println("get failed,err:",err)
	//	return
	//}
	//for _,ev := range resp.Kvs{
	//	fmt.Printf("%s : %s \n",ev.Key,ev.Value)
	//}


	// put
	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	//value := `[{"path":"c:/tmp/nginx.log","topic":"web.log"},{"path":"d:/xxx/redis.log","topic":"redis.log"}]`
	value := `[{"path":"c:/tmp/nginx.log","topic":"web.log"},{"path":"d:/xxx/redis.log","topic":"redis.log"},{"path":"d:/xxx/mysql.log","topic":"mysql.log"}]`
	_, err = cli.Put(ctx, "/logagent/collect_config", value)
	//_, err = cli.Put(ctx, "baodelu", "dsb")
	cancel()
	if err != nil {
		fmt.Printf("put to etcd failed, err:%v\n", err)
		return
	}

	// get
	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	resp, err := cli.Get(ctx, "/logagent/collect_config")
	cancel()
	if err != nil {
		fmt.Printf("get from etcd failed, err:%v\n", err)
		return
	}
	for _, ev := range resp.Kvs {
		fmt.Printf("%s:%s\n", ev.Key, ev.Value,)
	}
}
Watch
package main

import (
	"context"
	"fmt"
	"go.etcd.io/etcd/clientv3"
	"time"
)

func main() {

	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"192.168.43.233:2379", "192.168.43.130:22379", "192.168.43.246:32379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		fmt.Println("connect failed, err:", err)
		return
	}
	fmt.Println("connect success")
	defer cli.Close()

	for {
		rch := cli.Watch(context.Background(),"/nginx/log")
		for wresp := range rch{
			for _,ev := range wresp.Events{
				fmt.Printf("%s %q :%q \n",ev.Type,ev.Kv.Key,ev.Kv.Value)
			}
		}
	}
}