kafka相关操作

  • 2019 年 11 月 22 日
  • 筆記

调试工具操作

- 启动  bin/zookeeper-server-start.sh config/zookeeper.properties    bin/kafka-server-start.sh config/server.properties    - 列出topicc  ./kafka-topics.sh  --zookeeper 9.43.186.132:2181,9.43.186.152:2181,9.43.186.176:2181 --list    - 创建topic  bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic test --partitions 1 --replication-factor 1    - 查看topic的状态  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test    - 消费者 读数据  bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning --group superpig    - 生产者  bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test    - 删除topic  ./kafka-topics --delete --zookeeper 10.0.8.23:2181 --topic PedesJobResult

go客户端,读消息

/*  "github.com/Shopify/sarama"  cluster "github.com/bsm/sarama-cluster"  */  ctx, cancel = context.WithCancel(context.Background())    config := cluster.NewConfig()  config.Consumer.Return.Errors = true  config.Group.Return.Notifications = true  config.Version = sarama.V2_0_0_0    // 这里很有迷惑性,实际上,这个选项只有第一次new consumer的时候才会有效,当partion已经存在offset,这是没用的  // 如果想每次重启,都忽略中间产生的消息,必须更换group_ip  config.Consumer.Offsets.Initial = sarama.OffsetNewest    var topicArr = []string{topic}  // KafkaAddresses=["kafka.service.consul:9092"]  consumer, err := cluster.NewConsumer(kafkaAddress,      kafkaGroupID, topicArr, config)    if err != nil {      logging.Errorf("cluster.NewConsumer  err:%s", err)      return nil  }    go func() {      for err := range consumer.Errors() {          logging.Errorf("consumer.Error: groupId:%s:Error: %sn", kafkaGroupID, err.Error())      }  }()  go func() {      for ntf := range consumer.Notifications() {          logging.Infof("consumer.Notification: groupId:%s Rebalanced: %+v n", kafkaGroupID, ntf)      }  }()    logging.Infof("NewKafka loop before")  Loop:      for {          select {          case msgc, ok := <-consumer.Messages():              if ok {                  //logging.Debugf("read msg %v", msgc)                  // do sth                  // 如果sarama.OffsetNewest ,commit意义不大                  consumer.MarkOffset(msg, "")                } else {                  logging.Errorf("read msg not ok %v", topic)              }          case <-ctx.Done():              logging.Infof("kafka done %v", topic)              break Loop          case <-time.After(time.Second * 3):              //logging.Debugf("NewKafka %v timeout", topic)          }      }      logging.Infof("NewKafka kafka exit %v", topic)      consumer.Close()

go客户端,写消息

// producer config  config := sarama.NewConfig()  config.Producer.Retry.Max = 5  config.Producer.RequiredAcks = sarama.WaitForAll  config.Producer.Return.Successes = true    // sync producer  producer, err := sarama.NewSyncProducer(addr, config)    i := 0  for {      i++      msg := &sarama.ProducerMessage{          Topic: topics[0],          Value: sarama.StringEncoder(strconv.Itoa(i)),      }      _, _, err = producer.SendMessage(msg)      checkerr.CheckError(err)      time.Sleep(time.Millisecond * 500)  }