nsq topic

Topic相關的程式碼主要位於nsqd/topic.go中。

上一篇文字我們講解了下nsq的啟動流程。對nsq的整體框架有了一個大概的了解。本篇文章就是由大到小。對於topic這一部分進行詳盡的講解。

topic 管理著多個 channel 通過從 client 中獲取消息,然後將消息發送到 channel 中傳遞給客戶端.在 channel 初始化時會載入原有的 topic 並在最後統一執行 topic.Start(),新創建的 topic 會同步給 lookupd 後開始運行. nsqd 中通過創建創建多個 topic 來管理不同類別的頻道.

topic結構體:

type Topic struct {
  // 64bit atomic vars need to be first for proper alignment on 32bit platforms
  // 這兩個欄位僅作統計資訊,保證 32 位對其操作
  messageCount uint64  // 累計消息數
  messageBytes uint64// 累計消息體的位元組數
​
  sync.RWMutex  // 加鎖,包括 putMessage
​
  name              string // topic名,生產和消費時需要指定此名稱
  channelMap        map[string]*Channel  // 保存每個channel name和channel指針的映射
  backend           BackendQueue    // 磁碟隊列,當記憶體memoryMsgChan滿時,寫入硬碟隊列
  memoryMsgChan     chan *Message    // 消息優先存入這個記憶體chan
  startChan         chan int    // 接收開始訊號的 channel,調用 start 開始 topic 消息循環
​
  exitChan          chan int    // 判斷 topic 是否退出
​
  // 在 select 的地方都要添加 exitChan
  // 除非使用 default 或者保證程式不會永遠阻塞在 select 處,即可以退出循環
  // channel 更新時用來通知並更新消息循環中的 chan 數組
  channelUpdateChan chan int
  // 用來等待所有的子 goroutine
  waitGroup         util.WaitGroupWrapper
  exitFlag          int32     // topic 退出標識符
  idFactory         *guidFactory    // 生成 guid 的工廠方法
​
  ephemeral      bool  // 該 topic 是否是臨時 topic
  deleteCallback func(*Topic)   // topic 刪除時的回調函數
  deleter        sync.Once   // 確保 deleteCallback 僅執行一次
​
  paused    int32   // topic 是否暫停
  pauseChan chan int   // 改變 topic 暫停/運行狀態的通道
  ctx *context  // topic 的上下文
}

可以看到。topic 採用了 map + *Channel 來管理所有的channel. 並且也有 memoryMsgChan 和 backend 2個隊列。

實例化Topic :

下面就是 topic 的創建流程,傳入的參數參數包括,topicName,上下文環境,刪除回調函數:

func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
  t := &Topic{
    name:              topicName, //topic名稱
    channelMap:        make(map[string]*Channel),
    memoryMsgChan:     nil,
    startChan:         make(chan int, 1),
    exitChan:          make(chan int),
    channelUpdateChan: make(chan int),
    ctx:               ctx, //上下文指針
    paused:            0,
    pauseChan:         make(chan int),
    deleteCallback:    deleteCallback, //刪除callback函數
    // 所有 topic 使用同一個 guidFactory,因為都是用的 nsqd 的 ctx.nsqd.getOpts().ID 為基礎生成的
    idFactory:         NewGUIDFactory(ctx.nsqd.getOpts().ID),
  }
  // create mem-queue only if size > 0 (do not use unbuffered chan)
  //  // 根據消息隊列生成消息 chan,default size = 10000
  if ctx.nsqd.getOpts().MemQueueSize > 0 {
    // 初始化一個消息隊列
    t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
  }
  // 判斷這個 topic 是不是暫時的,暫時的 topic 消息僅僅存儲在記憶體中
  // DummyBackendQueue 和 diskqueue 均實現了 backend 介面
  if strings.HasSuffix(topicName, "#ephemeral") {
    // 臨時的 topic,設置標誌並使用 newDummyBackendQueue 初始化 backend
    t.ephemeral = true
    t.backend = newDummyBackendQueue()   // 實現了 backend 但是並沒有邏輯,所有操作僅僅返回 nil
  } else {
    dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
      opts := ctx.nsqd.getOpts()
      lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
    }
    // 使用 diskqueue 初始化 backend 隊列
    t.backend = diskqueue.New(
      topicName,
      ctx.nsqd.getOpts().DataPath,
      ctx.nsqd.getOpts().MaxBytesPerFile,
      int32(minValidMsgLength),
      int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
      ctx.nsqd.getOpts().SyncEvery,
      ctx.nsqd.getOpts().SyncTimeout,
      dqLogf,
    )
  }
  // 使用一個新的協程來執行 messagePump
  //startChan 就發送給了它,messagePump 函數負責分發整個 topic 接收到的消息給該 topic 下的 channels.
  t.waitGroup.Wrap(t.messagePump)
  // 調用 Notify
  t.ctx.nsqd.Notify(t)
​
  return t
}

可以看到先實例化了一個Topic指針對象。初始化memoryMsgChan隊列, 默認1000個。並且判斷topicName是否是臨時topic,如果是的話,BackendQueue(這是個介面)實現了一個空的記憶體Queue. 否則使用 diskqueue來初始化 backend隊列。

隨後,NewTopic函數開啟一個新的goroutine來執行messagePump函數,該函數負責消息循環,將進入topic中的消息投遞到channel中。

最後,NewTopic函數執行t.ctx.nsqd.Notify(t),該函數在topic和channel創建、停止的時候調用, Notify函數通過執行PersistMetadata函數,將topic和channel的資訊寫到文件中。

func (n *NSQD) Notify(v interface{}) {
  persist := atomic.LoadInt32(&n.isLoading) == 0
  n.waitGroup.Wrap(func() {
    // by selecting on exitChan we guarantee that
    // we do not block exit, see issue #123
    select {
    //如果執行那一刻 有exitChan 那麼就走exit
    case <-n.exitChan:
      //否則就走正常邏輯 往notifyChan 里發個消息
    case n.notifyChan <- v:
      if !persist {
        return
      }
      n.Lock()
      err := n.PersistMetadata()
      if err != nil {
        n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
      }
      n.Unlock()
    }
  })
}

Notify函數的實現時,首先考慮了數據持久化的時機,如果當前nsqd尚在初始化,則不需要立即持久化數據,因為nsqd在初始化後會進行一次統一的持久化工作,

Notify在進行數據持久化的時候採用了非同步的方式。使得topic和channel能以同步的方式來調用Nofity而不阻塞。在非同步運行的過程中, 通過waitGroup和監聽exitChan的使用保證了結束程式時goroutine能正常退出。

在執行持久化之前,case n.notifyChan <- v:語句向notifyChan傳遞消息,觸發lookupLoop函數(nsqd/lookup.go中)接收notifyChan消息的部分, 從而實現向loopupd註冊/取消註冊響應的topic或channel。

消息寫入Topic

客戶端通過nsqd的HTTP API或TCP API向特定topic發送消息,nsqd的HTTP或TCP模組通過調用對應topic的PutMessagePutMessages函數, 將消息投遞到topic中。PutMessagePutMessages函數都通過topic的私有函數put進行消息的投遞,兩個函數的區別僅在PutMessage只調用一次putPutMessages遍歷所有要投遞的消息,對每條消息使用put函數進行投遞。默認topic會優先往memoryMsgChan 隊列內投遞,如果記憶體隊列已滿,才會往磁碟隊列寫入,(臨時的topic磁碟隊列不做任何存儲,數據直接丟棄)

func (t *Topic) put(m *Message) error {
    select {
    case t.memoryMsgChan <- m:
    default:
        //寫入磁碟隊列
    }
    return nil
}

Start && messagePump 操作

topic的Start方法就是發送了個 startChan ,這裡有個小技巧,nsq使用了select來發送這個消息,這樣做的目的是如果start被並發調用了,第二個start會直接走到default里,什麼都不做.

那麼這個Start函數都有哪裡調用的呢。

1、 nsqd啟動的時候,觸發LoadMetadata 會把文件里的topic載入到記憶體里,這時候會調用Start方法

2、 用戶通過請求獲取topic的時候會通過 getTopic 來獲取或者創建topic

func (t *Topic) Start() {
  select {
  case t.startChan <- 1:
  default:
  }
}

接下來我們看下 messagePump, 剛才的 startChan 就是發給了這個函數,該函數在創建新的topic時通過waitGroup在新的goroutine中運行。該函數僅在觸發 startChan 開始運行,否則會阻塞住,直到退出。

for {
    select {
    case <-t.channelUpdateChan:
      continue
    case <-t.pauseChan:
      continue
    case <-t.exitChan:
      goto exit
    case <-t.startChan:
    }
    break
  }

messagePump函數初始化時先獲取當前存在的channel數組,設置memoryMsgChanbackendChan,隨後進入消息循環, 在循環中主要處理四種消息:

  1. 接收來自memoryMsgChanbackendChan兩個go channel進入的消息,並向當前的channal數組中的channel進行投遞

  2. 處理當前topic下channel的更新

  3. 處理當前topic的暫停和恢復

  4. 監聽當前topic的刪除

消息投遞

case msg = <-memoryMsgChan:
case buf = <-backendChan:
    msg, err = decodeMessage(buf)
    if err != nil {
        t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
        continue
    }

這兩個case語句處理進入topic的消息,關於兩個go channel的區別會在後續的部落格中分析。 從memoryMsgChanbackendChan讀取到的消息是*Message類型,而從backendChan讀取到的消息是byte數組的。 因此取出backendChan的消息後海需要調用decodeMessage函數對byte數組進行解碼,返回*Message類型的消息。 二者都保存在msg變數中。

for i, channel := range chans {
    chanMsg := msg
    if i > 0 {
        chanMsg = NewMessage(msg.ID, msg.Body)
        chanMsg.Timestamp = msg.Timestamp
        chanMsg.deferred = msg.deferred
    }
    if chanMsg.deferred != 0 {
        channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
        continue
    }
    err := channel.PutMessage(chanMsg)
    if err != nil {
        t.ctx.nsqd.logf(
            "TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
            t.name, msg.ID, channel.name, err)
    }
}

隨後是將消息投到每個channel中,首先先對消息進行複製操作,這裡有個優化,對於第一次循環, 直接使用原消息進行發送以減少複製對象的開銷,此後的循環將對消息進行複製。對於即時的消息, 直接調用channel的PutMessage函數進行投遞,對於延遲的消息, 調用channel的StartDeferredTimeout函數進行投遞。對於這兩個函數的投遞細節,後續博文中會詳細分析。

Topic下Channel的更新

case <-t.channelUpdateChan:
    chans = chans[:0]
    t.RLock()
    for _, c := range t.channelMap {
        chans = append(chans, c)
    }
    t.RUnlock()
    if len(chans) == 0 || t.IsPaused() {
        memoryMsgChan = nil
        backendChan = nil
    } else {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }
    continue

Channel的更新比較簡單,從channelMap中取出每個channel,構成channel的數組以便後續進行消息的投遞。 並且根據當前是否有channel以及該topic是否處於暫停狀態來決定memoryMsgChanbackendChan是否為空。

Topic的暫停和恢復

case pause := <-t.pauseChan:
    if pause || len(chans) == 0 {
        memoryMsgChan = nil
        backendChan = nil
    } else {
        memoryMsgChan = t.memoryMsgChan
        backendChan = t.backend.ReadChan()
    }
    continue

這個case既處理topic的暫停也處理topic的恢復,pause變數決定其究竟是哪一種操作。 Topic的暫停和恢復其實和topic的更新很像,根據是否暫停以及是否有channel來決定是否分配memoryMsgChanbackendChan

messagePump函數的退出

case <-t.exitChan:
    goto exit
​
// ...
exit:
    t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name)
}
// End of messagePump

messagePump通過監聽exitChan來獲知topic是否被刪除,當topic的刪除時,跳轉到函數的最後,輸出日誌後退出消息循環。

Topic的關閉和刪除

// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
    return t.exit(true)
}
​
// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
    return t.exit(false)
}
​
func (t *Topic) exit(deleted bool) error {
    if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
        return errors.New("exiting")
    }
​
    if deleted {
        t.ctx.nsqd.logf("TOPIC(%s): deleting", t.name)
​
        // since we are explicitly deleting a topic (not just at system exit time)
        // de-register this from the lookupd
        t.ctx.nsqd.Notify(t)
    } else {
        t.ctx.nsqd.logf("TOPIC(%s): closing", t.name)
    }
​
    close(t.exitChan)
​
    // synchronize the close of messagePump()
    t.waitGroup.Wait()
​
    if deleted {
        t.Lock()
        for _, channel := range t.channelMap {
            delete(t.channelMap, channel.name)
            channel.Delete()
        }
        t.Unlock()
​
        // empty the queue (deletes the backend files, too)
        t.Empty()
        return t.backend.Delete()
    }
​
    // close all the channels
    for _, channel := range t.channelMap {
        err := channel.Close()
        if err != nil {
            // we need to continue regardless of error to close all the channels
            t.ctx.nsqd.logf("ERROR: channel(%s) close - %s", channel.name, err)
        }
    }
​
    // write anything leftover to disk
    t.flush()
    return t.backend.Close()
}
// Exiting returns a boolean indicating if this topic is closed/exiting
func (t *Topic) Exiting() bool {
    return atomic.LoadInt32(&t.exitFlag) == 1
}

Topic關閉和刪除的實現都是調用exit函數,只是傳遞的參數不同,刪除時調用exit(true),關閉時調用exit(false)exit函數進入時通過atomic.CompareAndSwapInt32函數判斷當前是否正在退出,如果不是,則設置退出標記,對於已經在退出的topic,不再重複執行退出函數。 接著對於關閉操作,使用Notify函數通知lookupd以便其他nsqd獲知該消息。

隨後,exit函數調用close(t.exitChan)t.waitGroup.Wait()通知其他正在運行goroutine當前topic已經停止,並等待waitGroup中的goroutine結束運行。

最後,對於刪除和關閉兩種操作,執行不同的邏輯來完成最後的清理工作:

  • 對於刪除操作,需要清空channelMap並刪除所有channel,然後刪除記憶體和磁碟中所有未投遞的消息。最後關閉backend管理的的磁碟文件。

  • 對於關閉操作,不清空channelMap,只是關閉所有的channel,使用flush函數將所有memoryMsgChan中未投遞的消息用writeMessageToBackend保存到磁碟中。最後關閉backend管理的的磁碟文件。

func (t *Topic) flush() error {
    //...
    for {
        select {
        case msg := <-t.memoryMsgChan:
            err := writeMessageToBackend(&msgBuf, msg, t.backend)
            if err != nil {
                t.ctx.nsqd.logf(
                    "ERROR: failed to write message to backend - %s", err)
            }
        default:
            goto finish
        }
    }
    
finish:
    return nil
}

flush函數也使用到了default分支來檢測是否已經處理完全部消息。 由於此時已經沒有生產者向memoryMsgChan提供消息,因此如果出現阻塞就表示消息已經處理完畢。

func (t *Topic) Empty() error {
    for {
        select {
        case <-t.memoryMsgChan:
        default:
            goto finish
        }
    }
​
finish:
    return t.backend.Empty()
}

在刪除topic時用到的Empty函數跟flush處理邏輯類似,只不過Empty只釋放memoryMsgChan消息,而不保存它們。

topic 下的源碼基本就看完了,雖然還沒有別的部分完整的完整的串聯起來,但是也可以了解到,多個 topic 在初始化時就開啟了消息循環 goroutine,執行完 Start 後開始消息分發,如果是正常的Topic,除了默認10000的記憶體隊列,還會有個硬碟隊列。topic將收到的消息分發到管理的 channel 中.每個 topic 運行的 goroutine 比較簡單,只有一個消息分發 goroutine: messagePump.

Tags: