go: kafka 将group设置为最新

  • 2019 年 11 月 21 日
  • 笔记

有时,在确保group当前没有consumer的情况下,可以将这个group的偏移设置成最新,以保证下次启动时,group能从最新的消息消费。 代码:

func initKafka() (err error) {      defer checkerr.MarkPanic(&err)        topics := []string{"hello"}      groupId := "superpig"      addr := []string{"127.0.0.1:9092"}        client, err := sarama.NewClient(addr, nil)      checkerr.CheckError(err)        partionIds, err := client.Partitions(topics[0])      checkerr.CheckError(err)        defer client.Close()        time.Sleep(time.Second)        config := cluster.NewConfig()      config.Consumer.Return.Errors = true      config.Group.Return.Notifications = true      config.Version = sarama.V2_0_0_0        config.Consumer.Offsets.Initial = sarama.OffsetNewest        consumer, err := cluster.NewConsumer(addr, groupId, topics, config)      defer consumer.Close()        // 这两个go 必不可少,否则不正常,我也不知道为啥      go func() {          for err := range consumer.Errors() {              logging.Errorf("consumer.Error: groupId:%s:Error: %s;topic:%vn",                  groupId, err.Error(),                  topics[0])          }      }()        go func() {          for ntf := range consumer.Notifications() {              logging.Infof("consumer.Notification: groupId:%s Rebalanced: %+v;topic:%vn",                  groupId, ntf, topics[0])          }      }()        time.Sleep(time.Second * 5)        for id := range partionIds {          lastoffset, err := client.GetOffset(topics[0],              int32(id), sarama.OffsetNewest)          checkerr.CheckError(err)          // 必须 lastoffset - 1,否则offset被设置成0          logging.Infof("lastoffset:%v", lastoffset)          consumer.MarkPartitionOffset(topics[0], int32(id), lastoffset-1, "")          err = consumer.CommitOffsets()          checkerr.CheckError(err)      }        return nil  }