kafka相關操作
調試工具操作
- 啟動 bin/zookeeper-server-start.sh config/zookeeper.properties bin/kafka-server-start.sh config/server.properties - 列出topicc ./kafka-topics.sh --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list - 創建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1 - 查看topic的狀態 bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test - 消費者 讀數據 bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group superpig - 生產者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test - 刪除topic ./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult
go客戶端,讀消息
/* "github.com/Shopify/sarama" cluster "github.com/bsm/sarama-cluster" */ ctx, cancel = context.WithCancel(context.Background()) config := cluster.NewConfig() config.Consumer.Return.Errors = true config.Group.Return.Notifications = true config.Version = sarama.V2_0_0_0 // 這裡很有迷惑性,實際上,這個選項只有第一次new consumer的時候才會有效,當partion已經存在offset,這是沒用的 // 如果想每次重啟,都忽略中間產生的消息,必須更換group_ip config.Consumer.Offsets.Initial = sarama.OffsetNewest var topicArr = []string{topic} // KafkaAddresses=["kafka.service.consul:9092"] consumer, err := cluster.NewConsumer(kafkaAddress, kafkaGroupID, topicArr, config) if err != nil { logging.Errorf("cluster.NewConsumer err:%s", err) return nil } go func() { for err := range consumer.Errors() { logging.Errorf("consumer.Error: groupId:%s:Error: %sn", kafkaGroupID, err.Error()) } }() go func() { for ntf := range consumer.Notifications() { logging.Infof("consumer.Notification: groupId:%s Rebalanced: %+v n", kafkaGroupID, ntf) } }() logging.Infof("NewKafka loop before") Loop: for { select { case msgc, ok := <-consumer.Messages(): if ok { //logging.Debugf("read msg %v", msgc) // do sth // 如果sarama.OffsetNewest ,commit意義不大 consumer.MarkOffset(msg, "") } else { logging.Errorf("read msg not ok %v", topic) } case <-ctx.Done(): logging.Infof("kafka done %v", topic) break Loop case <-time.After(time.Second * 3): //logging.Debugf("NewKafka %v timeout", topic) } } logging.Infof("NewKafka kafka exit %v", topic) consumer.Close()
go客戶端,寫消息
// producer config config := sarama.NewConfig() config.Producer.Retry.Max = 5 config.Producer.RequiredAcks = sarama.WaitForAll config.Producer.Return.Successes = true // sync producer producer, err := sarama.NewSyncProducer(addr, config) i := 0 for { i++ msg := &sarama.ProducerMessage{ Topic: topics[0], Value: sarama.StringEncoder(strconv.Itoa(i)), } _, _, err = producer.SendMessage(msg) checkerr.CheckError(err) time.Sleep(time.Millisecond * 500) }