業務數據處理一定要單獨開線程嗎
- 2019 年 12 月 12 日
- 筆記
在 《one thread one loop 思想》一文我們介紹了一個 loop 的主要結構一般如下所示:
while (!m_bQuitFlag) { epoll_or_select_func(); handle_io_events(); handle_other_things(); }
對於一些業務邏輯處理比較簡單、不會太耗時的應用來說,handle_io_events() 方法除了收發數據也可以直接用來直接做業務的處理,即其結構如下:
void handle_io_events() { //收發數據 recv_or_send_data(); //解包並處理數據 decode_packages_and_process(); }
其中 recv_or_send_data() 方法中調用 send/recv API 進行實際的網絡數據收發。以收數據為例,收完數據存入接收緩衝區後,接下來進行解包處理,然後進行業務處理,例如一個登陸數據包,其業務就是驗證登陸的賬戶密碼是否正確、記錄其登陸行為等等。從程序函數調用堆棧來看,這些業務處理邏輯其實是直接在網絡收發數據線程中處理的。我的意思是:網絡線程調用 handle_io_events() 方法,handle_io_events() 方法調用 decode_packages_and_process() 方法,decode_packages_and_process() 方法做具體的業務邏輯處理。
需要注意的是,為了讓網絡層與業務層脫耦,網絡層中通常會提供一些回調函數的接口,這些回調函數我們將其指向具體的業務處理函數。以 libevent 網絡庫的用法為例:
int main(int argc, char **argv) { struct event_base *base; struct evconnlistener *listener; struct event *signal_event; struct sockaddr_in sin; base = event_base_new(); memset(&sin, 0, sizeof(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(PORT); //listener_cb是我們自定義回調函數 listener = evconnlistener_new_bind(base, listener_cb, (void *)base, LEV_OPT_REUSEABLE|LEV_OPT_CLOSE_ON_FREE, -1, (struct sockaddr*)&sin, sizeof(sin)); if (!listener) { fprintf(stderr, "Could not create a listener!n"); return 1; } //signal_cb是我們自定義回調函數 signal_event = evsignal_new(base, SIGINT, signal_cb, (void *)base); if (!signal_event || event_add(signal_event, NULL)<0) { fprintf(stderr, "Could not create/add a signal event!n"); return 1; } //啟動loop event_base_dispatch(base); evconnlistener_free(listener); event_free(signal_event); event_base_free(base); printf("donen"); return 0; }
上述代碼根據 libevent 自帶的 helloworld 示例修改而來,其中 listener_cb 和 signal_cb 是自定義的回調函數,有相應的事件觸發後,libevent 的事件循環會調用我們設置的回調,在這些回調函數中,我們可以編寫自己的業務邏輯代碼。
這種基本的服務器結構,我們可以繪製成如下流程圖:

這是這個結構的最基本邏輯,在這基礎上可以延伸出很多變體。不知道讀者有沒有發現,上述流程圖中第三步解包和業務邏輯處理這一步中(位於 handle_io_events() 中的 decode_packages_and_process() 方法中),如果業務邏輯處理過程比較耗時(例如,從數據庫取大量數據、寫文件),那麼會導致 網絡線程在這個步驟停留時間很長,導致很久以後才能執行下一次循環,影響網絡數據的檢測和收發,最終導致整個程序的效率低下。
因此,對於這種情形,我們需要將業務處理邏輯單獨拆出來交給另外的業務工作線程處理,業務工作線程可以是一個線程池,這個過程業務數據從網絡線程組流向業務線程組。
這樣的程序結構圖如下圖所示:

上圖中,對於網絡線程將業務數據包交給業務線程,可以使用一個共享的業務數據隊列來實現,此時網絡線程是生產者,業務線程從業務數據隊列中取出任務去處理,業務線程是消費者。業務線程處理完成後如果需要將結果數據發出去,則再將數據交給網絡線程。這裡處理後的數據從業務線程再次流向網絡線程,那麼如何將數據從業務線程交給網絡線程呢?這裡以發數據為例,一般有三種方法:
方法一
直接調用相應的的發數據的方法,如果你的網絡線程本身也會調用這些發數據的方法,那麼此時就可能會出現網絡線程和業務線程同時對發方法進行調用,相當於多個線程同時調用 socket send 函數,這樣可能會導致同一個連接上的數據順序有問題,此時的做法時,利用鎖機制,同一時刻只有一個線程可以調用 socket send 方法。這裡給出一段偽代碼,假設 TcpConnection 對象表示某路連接,無論網絡線程還是業務線程處理完數據後需要發送數據,則使用:
void TcpConnection::sendData(const std::string& data) { //加上鎖 std::lock_guard<std::mutex> scoped_lock(m_mutexForConnection); //在這裡調用 send }
方法一的做法在設計上來說,存在讓人不滿意的地方,即數據發送應該屬於網絡層自己的事情,而不是其他模塊(這裡指的是業務線程)強行搶奪過來越俎代庖。
方法二
前面章節介紹了存在定時器結構的情況,網絡線程結構變成如下流程:
while (!m_bQuitFlag) { check_and_handle_timers(); epoll_or_select_func(); handle_io_events(); }
業務線程可以將需要發送的數據放入另外一個共享區域中(例如相應的 TcpConnection 對象的一個成員變量中),定時器定時從這個共享區域取出來,再發送出去,這種方案的優點是網絡線程做了它該做的事情,缺點是需要添加定時器,讓程序邏輯變得複雜,且定時器是每隔一段時間才會觸發,發送的數據可能會有一定的延遲。
方法三
利用線程執行流中的 handle_other_things() 方法,再來看下前面章節中介紹的基本結構:
while (!m_bQuitFlag) { epoll_or_select_func(); handle_io_events(); handle_other_things(); }
我們在《one thread one loop 思想》章節介紹了 handle_other_things() 函數可以做一些「其他事情」,這個函數可以在需要執行時通過前面章節介紹的喚醒機制立即被喚醒執行。業務線程將數據放入某個共享區域中(這一步和方法二介紹的一樣),然後添加 "other_things" ,在 handle_other_things() 中執行數據的發送。
如果讀者能清晰明白地看到這裡,說明您大致明白了一個不錯的服務器框架是怎麼回事了。上面介紹的服務器結構是目前主流的基於 Reactor 模式的服務程序的通用結構,例如 libevent、libuv。
如果讀者有興趣,咱們可以再進一步深入討論一下。
實際應用中,很多程序的業務邏輯處理其實是不耗時的,也就是說這些業務邏輯處理速度很快。由於 CPU 核數有限,當線程數量超過 CPU 數量時,各個線程(網絡線程和業務線程)也不是真正地並行執行,那麼即使開了一組業務線程也不一定能真正地並發執行,而業務邏輯處理並不耗時,不會影響網絡線程的執行效率,那麼我們不如就在網絡線程裏面直接處理。
上文介紹了在 handle_io_events() 方法中直接處理,如果處理的業務邏輯會產生新的其他任務,那麼我們可以投遞 "other_things",最終交給 handle_other_things() 方法來處理。此時的服務器程序結構如下:
特別說明一下:這種方式僅限於 handle_io_events() 或 handle_other_things() 裏面不會有耗時的邏輯,才可以替代專門開業務線程,如果有耗時操作還得老老實實單獨開業務線程。雖然線程數量超過 CPU 數量時,各個線程不會得到真正的並行,但那是操作系統線程調度的事情了,應用層開發不必關心這點。