nsq topic
Topic相關的程式碼主要位於nsqd/topic.go
中。
上一篇文字我們講解了下nsq的啟動流程。對nsq的整體框架有了一個大概的了解。本篇文章就是由大到小。對於topic這一部分進行詳盡的講解。
topic 管理著多個 channel 通過從 client 中獲取消息,然後將消息發送到 channel 中傳遞給客戶端.在 channel 初始化時會載入原有的 topic 並在最後統一執行 topic.Start(),新創建的 topic 會同步給 lookupd 後開始運行. nsqd 中通過創建創建多個 topic 來管理不同類別的頻道.
topic結構體:
type Topic struct {
// 64bit atomic vars need to be first for proper alignment on 32bit platforms
// 這兩個欄位僅作統計資訊,保證 32 位對其操作
messageCount uint64 // 累計消息數
messageBytes uint64// 累計消息體的位元組數
sync.RWMutex // 加鎖,包括 putMessage
name string // topic名,生產和消費時需要指定此名稱
channelMap map[string]*Channel // 保存每個channel name和channel指針的映射
backend BackendQueue // 磁碟隊列,當記憶體memoryMsgChan滿時,寫入硬碟隊列
memoryMsgChan chan *Message // 消息優先存入這個記憶體chan
startChan chan int // 接收開始訊號的 channel,調用 start 開始 topic 消息循環
exitChan chan int // 判斷 topic 是否退出
// 在 select 的地方都要添加 exitChan
// 除非使用 default 或者保證程式不會永遠阻塞在 select 處,即可以退出循環
// channel 更新時用來通知並更新消息循環中的 chan 數組
channelUpdateChan chan int
// 用來等待所有的子 goroutine
waitGroup util.WaitGroupWrapper
exitFlag int32 // topic 退出標識符
idFactory *guidFactory // 生成 guid 的工廠方法
ephemeral bool // 該 topic 是否是臨時 topic
deleteCallback func(*Topic) // topic 刪除時的回調函數
deleter sync.Once // 確保 deleteCallback 僅執行一次
paused int32 // topic 是否暫停
pauseChan chan int // 改變 topic 暫停/運行狀態的通道
ctx *context // topic 的上下文
}
可以看到。topic 採用了 map + *Channel 來管理所有的channel. 並且也有 memoryMsgChan 和 backend 2個隊列。
實例化Topic :
下面就是 topic 的創建流程,傳入的參數參數包括,topicName,上下文環境,刪除回調函數:
func NewTopic(topicName string, ctx *context, deleteCallback func(*Topic)) *Topic {
t := &Topic{
name: topicName, //topic名稱
channelMap: make(map[string]*Channel),
memoryMsgChan: nil,
startChan: make(chan int, 1),
exitChan: make(chan int),
channelUpdateChan: make(chan int),
ctx: ctx, //上下文指針
paused: 0,
pauseChan: make(chan int),
deleteCallback: deleteCallback, //刪除callback函數
// 所有 topic 使用同一個 guidFactory,因為都是用的 nsqd 的 ctx.nsqd.getOpts().ID 為基礎生成的
idFactory: NewGUIDFactory(ctx.nsqd.getOpts().ID),
}
// create mem-queue only if size > 0 (do not use unbuffered chan)
// // 根據消息隊列生成消息 chan,default size = 10000
if ctx.nsqd.getOpts().MemQueueSize > 0 {
// 初始化一個消息隊列
t.memoryMsgChan = make(chan *Message, ctx.nsqd.getOpts().MemQueueSize)
}
// 判斷這個 topic 是不是暫時的,暫時的 topic 消息僅僅存儲在記憶體中
// DummyBackendQueue 和 diskqueue 均實現了 backend 介面
if strings.HasSuffix(topicName, "#ephemeral") {
// 臨時的 topic,設置標誌並使用 newDummyBackendQueue 初始化 backend
t.ephemeral = true
t.backend = newDummyBackendQueue() // 實現了 backend 但是並沒有邏輯,所有操作僅僅返回 nil
} else {
dqLogf := func(level diskqueue.LogLevel, f string, args ...interface{}) {
opts := ctx.nsqd.getOpts()
lg.Logf(opts.Logger, opts.LogLevel, lg.LogLevel(level), f, args...)
}
// 使用 diskqueue 初始化 backend 隊列
t.backend = diskqueue.New(
topicName,
ctx.nsqd.getOpts().DataPath,
ctx.nsqd.getOpts().MaxBytesPerFile,
int32(minValidMsgLength),
int32(ctx.nsqd.getOpts().MaxMsgSize)+minValidMsgLength,
ctx.nsqd.getOpts().SyncEvery,
ctx.nsqd.getOpts().SyncTimeout,
dqLogf,
)
}
// 使用一個新的協程來執行 messagePump
//startChan 就發送給了它,messagePump 函數負責分發整個 topic 接收到的消息給該 topic 下的 channels.
t.waitGroup.Wrap(t.messagePump)
// 調用 Notify
t.ctx.nsqd.Notify(t)
return t
}
可以看到先實例化了一個Topic指針對象。初始化memoryMsgChan
隊列, 默認1000個。並且判斷topicName是否是臨時topic,如果是的話,BackendQueue
(這是個介面)實現了一個空的記憶體Queue. 否則使用 diskqueue
來初始化 backend隊列。
隨後,NewTopic
函數開啟一個新的goroutine來執行messagePump
函數,該函數負責消息循環,將進入topic中的消息投遞到channel中。
最後,NewTopic
函數執行t.ctx.nsqd.Notify(t)
,該函數在topic和channel創建、停止的時候調用, Notify
函數通過執行PersistMetadata
函數,將topic和channel的資訊寫到文件中。
func (n *NSQD) Notify(v interface{}) {
persist := atomic.LoadInt32(&n.isLoading) == 0
n.waitGroup.Wrap(func() {
// by selecting on exitChan we guarantee that
// we do not block exit, see issue #123
select {
//如果執行那一刻 有exitChan 那麼就走exit
case <-n.exitChan:
//否則就走正常邏輯 往notifyChan 里發個消息
case n.notifyChan <- v:
if !persist {
return
}
n.Lock()
err := n.PersistMetadata()
if err != nil {
n.logf(LOG_ERROR, "failed to persist metadata - %s", err)
}
n.Unlock()
}
})
}
在Notify
函數的實現時,首先考慮了數據持久化的時機,如果當前nsqd尚在初始化,則不需要立即持久化數據,因為nsqd在初始化後會進行一次統一的持久化工作,
Notify
在進行數據持久化的時候採用了非同步的方式。使得topic和channel能以同步的方式來調用Nofity而不阻塞。在非同步運行的過程中, 通過waitGroup
和監聽exitChan
的使用保證了結束程式時goroutine能正常退出。
在執行持久化之前,case n.notifyChan <- v:
語句向notifyChan
傳遞消息,觸發lookupLoop
函數(nsqd/lookup.go
中)接收notifyChan
消息的部分, 從而實現向loopupd
註冊/取消註冊響應的topic或channel。
消息寫入Topic
客戶端通過nsqd的HTTP API或TCP API向特定topic發送消息,nsqd的HTTP或TCP模組通過調用對應topic的PutMessage
或PutMessages
函數, 將消息投遞到topic中。PutMessage
或PutMessages
函數都通過topic的私有函數put
進行消息的投遞,兩個函數的區別僅在PutMessage
只調用一次put
, PutMessages
遍歷所有要投遞的消息,對每條消息使用put
函數進行投遞。默認topic會優先往memoryMsgChan
隊列內投遞,如果記憶體隊列已滿,才會往磁碟隊列寫入,(臨時的topic磁碟隊列不做任何存儲,數據直接丟棄)
func (t *Topic) put(m *Message) error {
select {
case t.memoryMsgChan <- m:
default:
//寫入磁碟隊列
}
return nil
}
Start && messagePump 操作
topic的Start方法就是發送了個 startChan ,這裡有個小技巧,nsq使用了select來發送這個消息,這樣做的目的是如果start被並發調用了,第二個start會直接走到default里,什麼都不做.
那麼這個Start函數都有哪裡調用的呢。
1、 nsqd啟動的時候,觸發LoadMetadata
會把文件里的topic載入到記憶體里,這時候會調用Start方法
2、 用戶通過請求獲取topic的時候會通過 getTopic 來獲取或者創建topic
func (t *Topic) Start() {
select {
case t.startChan <- 1:
default:
}
}
接下來我們看下 messagePump
, 剛才的 startChan 就是發給了這個函數,該函數在創建新的topic時通過waitGroup
在新的goroutine中運行。該函數僅在觸發 startChan 開始運行,否則會阻塞住,直到退出。
for {
select {
case <-t.channelUpdateChan:
continue
case <-t.pauseChan:
continue
case <-t.exitChan:
goto exit
case <-t.startChan:
}
break
}
messagePump
函數初始化時先獲取當前存在的channel數組,設置memoryMsgChan
和backendChan
,隨後進入消息循環, 在循環中主要處理四種消息:
-
接收來自
memoryMsgChan
和backendChan
兩個go channel進入的消息,並向當前的channal數組中的channel進行投遞 -
處理當前topic下channel的更新
-
處理當前topic的暫停和恢復
-
監聽當前topic的刪除
消息投遞
case msg = <-memoryMsgChan:
case buf = <-backendChan:
msg, err = decodeMessage(buf)
if err != nil {
t.ctx.nsqd.logf("ERROR: failed to decode message - %s", err)
continue
}
這兩個case語句處理進入topic的消息,關於兩個go channel的區別會在後續的部落格中分析。 從memoryMsgChanbackendChan
讀取到的消息是*Message
類型,而從backendChan
讀取到的消息是byte
數組的。 因此取出backendChan
的消息後海需要調用decodeMessage
函數對byte
數組進行解碼,返回*Message
類型的消息。 二者都保存在msg
變數中。
for i, channel := range chans {
chanMsg := msg
if i > 0 {
chanMsg = NewMessage(msg.ID, msg.Body)
chanMsg.Timestamp = msg.Timestamp
chanMsg.deferred = msg.deferred
}
if chanMsg.deferred != 0 {
channel.StartDeferredTimeout(chanMsg, chanMsg.deferred)
continue
}
err := channel.PutMessage(chanMsg)
if err != nil {
t.ctx.nsqd.logf(
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s",
t.name, msg.ID, channel.name, err)
}
}
隨後是將消息投到每個channel中,首先先對消息進行複製操作,這裡有個優化,對於第一次循環, 直接使用原消息進行發送以減少複製對象的開銷,此後的循環將對消息進行複製。對於即時的消息, 直接調用channel的PutMessage
函數進行投遞,對於延遲的消息, 調用channel的StartDeferredTimeout
函數進行投遞。對於這兩個函數的投遞細節,後續博文中會詳細分析。
Topic下Channel的更新
case <-t.channelUpdateChan:
chans = chans[:0]
t.RLock()
for _, c := range t.channelMap {
chans = append(chans, c)
}
t.RUnlock()
if len(chans) == 0 || t.IsPaused() {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
Channel的更新比較簡單,從channelMap
中取出每個channel,構成channel的數組以便後續進行消息的投遞。 並且根據當前是否有channel以及該topic是否處於暫停狀態來決定memoryMsgChan
和backendChan
是否為空。
Topic的暫停和恢復
case pause := <-t.pauseChan:
if pause || len(chans) == 0 {
memoryMsgChan = nil
backendChan = nil
} else {
memoryMsgChan = t.memoryMsgChan
backendChan = t.backend.ReadChan()
}
continue
這個case既處理topic的暫停也處理topic的恢復,pause
變數決定其究竟是哪一種操作。 Topic的暫停和恢復其實和topic的更新很像,根據是否暫停以及是否有channel來決定是否分配memoryMsgChan
和backendChan
。
messagePump函數的退出
case <-t.exitChan:
goto exit
// ...
exit:
t.ctx.nsqd.logf("TOPIC(%s): closing ... messagePump", t.name)
}
// End of messagePump
messagePump
通過監聽exitChan
來獲知topic是否被刪除,當topic的刪除時,跳轉到函數的最後,輸出日誌後退出消息循環。
Topic的關閉和刪除
// Delete empties the topic and all its channels and closes
func (t *Topic) Delete() error {
return t.exit(true)
}
// Close persists all outstanding topic data and closes all its channels
func (t *Topic) Close() error {
return t.exit(false)
}
func (t *Topic) exit(deleted bool) error {
if !atomic.CompareAndSwapInt32(&t.exitFlag, 0, 1) {
return errors.New("exiting")
}
if deleted {
t.ctx.nsqd.logf("TOPIC(%s): deleting", t.name)
// since we are explicitly deleting a topic (not just at system exit time)
// de-register this from the lookupd
t.ctx.nsqd.Notify(t)
} else {
t.ctx.nsqd.logf("TOPIC(%s): closing", t.name)
}
close(t.exitChan)
// synchronize the close of messagePump()
t.waitGroup.Wait()
if deleted {
t.Lock()
for _, channel := range t.channelMap {
delete(t.channelMap, channel.name)
channel.Delete()
}
t.Unlock()
// empty the queue (deletes the backend files, too)
t.Empty()
return t.backend.Delete()
}
// close all the channels
for _, channel := range t.channelMap {
err := channel.Close()
if err != nil {
// we need to continue regardless of error to close all the channels
t.ctx.nsqd.logf("ERROR: channel(%s) close - %s", channel.name, err)
}
}
// write anything leftover to disk
t.flush()
return t.backend.Close()
}
// Exiting returns a boolean indicating if this topic is closed/exiting
func (t *Topic) Exiting() bool {
return atomic.LoadInt32(&t.exitFlag) == 1
}
Topic關閉和刪除的實現都是調用exit
函數,只是傳遞的參數不同,刪除時調用exit(true)
,關閉時調用exit(false)
。 exit
函數進入時通過atomic.CompareAndSwapInt32
函數判斷當前是否正在退出,如果不是,則設置退出標記,對於已經在退出的topic,不再重複執行退出函數。 接著對於關閉操作,使用Notify
函數通知lookupd以便其他nsqd獲知該消息。
隨後,exit
函數調用close(t.exitChan)
和t.waitGroup.Wait()
通知其他正在運行goroutine當前topic已經停止,並等待waitGroup
中的goroutine結束運行。
最後,對於刪除和關閉兩種操作,執行不同的邏輯來完成最後的清理工作:
-
對於刪除操作,需要清空
channelMap
並刪除所有channel,然後刪除記憶體和磁碟中所有未投遞的消息。最後關閉backend
管理的的磁碟文件。 -
對於關閉操作,不清空
channelMap
,只是關閉所有的channel,使用flush
函數將所有memoryMsgChan
中未投遞的消息用writeMessageToBackend
保存到磁碟中。最後關閉backend
管理的的磁碟文件。
func (t *Topic) flush() error {
//...
for {
select {
case msg := <-t.memoryMsgChan:
err := writeMessageToBackend(&msgBuf, msg, t.backend)
if err != nil {
t.ctx.nsqd.logf(
"ERROR: failed to write message to backend - %s", err)
}
default:
goto finish
}
}
finish:
return nil
}
flush
函數也使用到了default分支來檢測是否已經處理完全部消息。 由於此時已經沒有生產者向memoryMsgChan
func (t *Topic) Empty() error {
for {
select {
case <-t.memoryMsgChan:
default:
goto finish
}
}
finish:
return t.backend.Empty()
}
在刪除topic時用到的Empty
函數跟flush
處理邏輯類似,只不過Empty
只釋放memoryMsgChan
消息,而不保存它們。
topic 下的源碼基本就看完了,雖然還沒有別的部分完整的完整的串聯起來,但是也可以了解到,多個 topic 在初始化時就開啟了消息循環 goroutine,執行完 Start 後開始消息分發,如果是正常的Topic,除了默認10000的記憶體隊列,還會有個硬碟隊列。topic將收到的消息分發到管理的 channel 中.每個 topic 運行的 goroutine 比較簡單,只有一個消息分發 goroutine: messagePump.