Redis 源碼簡潔剖析 11 – 主 IO 執行緒及 Redis 6.0 多 IO 執行緒
Redis 到底是不是單執行緒的程式?
Redis 只有在處理「客戶端請求」時,是單執行緒的;整個 Redis server 不是單執行緒的,還有後台執行緒在輔助處理任務。
Redis 選擇單執行緒處理請求,是因為 Redis 操作的是「記憶體」
,加上設計了「高效」的數據結構,所以操作速度極快
,利用 IO 多路復用機制
,單執行緒依舊可以有非常高的性能。
Redis 不讓主執行緒執行一些耗時操作,比如同步寫、刪除等,而是交給後台執行緒非同步完成,從而避免了對主執行緒的阻塞。
在 2020 年 5 月推出的 Redis 6.0 版本中,還會使用多執行緒
來處理 IO 任務,能夠充分利用伺服器的多核特性
,使用多核運行多執行緒,讓多執行緒幫助加速數據讀取
、命令解析
和數據寫回
的速度,提升 Redis 的整體性能。
多 IO 執行緒的初始化
在 main 函數中,會調用 InitServerLast 函數,Redis 6.0 源碼:
void InitServerLast() {
bioInit();
// 初始化 IO 執行緒
initThreadedIO();
set_jemalloc_bg_thread(server.jemalloc_bg_thread);
server.initial_memory_usage = zmalloc_used_memory();
}
在調用了 bioInit 函數後,又調用了 initThreadedIO 函數初始化多 IO 執行緒。initThreadedIO
函數在 networking.c
文件中。
void initThreadedIO(void) {
// IO 執行緒激活標誌:設置為「未激活」
server.io_threads_active = 0;
// 只有 1 個 io 執行緒,直接返回,直接在主執行緒處理 IO
if (server.io_threads_num == 1) return;
if (server.io_threads_num > IO_THREADS_MAX_NUM) {
serverLog(LL_WARNING,"Fatal: too many I/O threads configured. "
"The maximum number is %d.", IO_THREADS_MAX_NUM);
exit(1);
}
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
// Thread 0 是主執行緒
if (i == 0) continue;
/* Things we do only for the additional threads. */
pthread_t tid;
// 初始化 io_threads_mutex
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]); /* Thread will be stopped. */
// pthread_create 創建 IO 執行緒,執行緒運行函數是 IOThreadMain
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
// 初始化 io_threads 數組,設置值為執行緒標識
io_threads[i] = tid;
}
}
程式碼中首先判斷 io_threads_num:
- io_threads_num = 1,表示直接在主執行緒處理,直接返回
- io_threads_num > IO_THREADS_MAX_NUM,表示 IO 執行緒數量>宏定義的值(默認值 128),直接退出程式
initThreadedIO 函數就會給以下四個數組進行初始化操作:
io_threads_list
數組:保存了每個 IO 執行緒要處理的客戶端,將數組每個元素初始化為一個 List 類型的列表io_threads_pending
數組:保存等待每個 IO 執行緒處理的客戶端個數io_threads_mutex
數組:保存執行緒互斥鎖io_threads
數組:保存每個 IO 執行緒的描述符
這四個數組的定義都在 networking.c 文件中:
pthread_t io_threads[IO_THREADS_MAX_NUM]; //記錄執行緒描述符的數組
pthread_mutex_t io_threads_mutex[IO_THREADS_MAX_NUM]; //記錄執行緒互斥鎖的數組
_Atomic unsigned long io_threads_pending[IO_THREADS_MAX_NUM]; //記錄執行緒待處理的客戶端個數
list *io_threads_list[IO_THREADS_MAX_NUM]; //記錄執行緒對應處理的客戶端
initThreadedIO 函數在 for 循環中,調用 pthread_create 函數創建執行緒。pthread_create 詳細語法見:pthread_create(3) — Linux manual page。
創建的執行緒要運行的函數是 IOThreadMain,*arg 參數就是當前創建執行緒的編號(從 1 開始,0 是主 IO 執行緒)。
/* Spawn and initialize the I/O threads. */
for (int i = 0; i < server.io_threads_num; i++) {
io_threads_list[i] = listCreate();
// Thread 0 是主執行緒
if (i == 0) continue;
/* Things we do only for the additional threads. */
pthread_t tid;
// 初始化 io_threads_mutex
pthread_mutex_init(&io_threads_mutex[i],NULL);
setIOPendingCount(i, 0);
pthread_mutex_lock(&io_threads_mutex[i]);
// pthread_create 創建 IO 執行緒,執行緒運行函數是 IOThreadMain
if (pthread_create(&tid,NULL,IOThreadMain,(void*)(long)i) != 0) {
serverLog(LL_WARNING,"Fatal: Can't initialize IO thread.");
exit(1);
}
// 初始化 io_threads 數組,設置值為執行緒標識
io_threads[i] = tid;
}
IO 執行緒運行函數 IOThreadMain
主要邏輯是一個 while(1) 的循環,會把 io_threads_list
在這個執行緒對應的元素取出來,判斷並處理。
void *IOThreadMain(void *myid) {
……
while(1) {
/* Wait for start */
for (int j = 0; j < 1000000; j++) {
if (getIOPendingCount(id) != 0) break;
}
……
// 獲取 IO 執行緒要處理的客戶端列表
listRewind(io_threads_list[id],&li);
while((ln = listNext(&li))) {
// 從客戶端列表中獲取一個客戶端
client *c = listNodeValue(ln);
// 執行緒是「寫操作」,調用 writeToClient 將數據寫回客戶端
if (io_threads_op == IO_THREADS_OP_WRITE) {
writeToClient(c,0);
// 如果是『讀操作』,調用 readQueryFromClient 從客戶端讀數據
} else if (io_threads_op == IO_THREADS_OP_READ) {
readQueryFromClient(c->conn);
}
……
}
// 處理完所有客戶端,清空該執行緒的客戶端列表
listEmpty(io_threads_list[id]);
// 將該執行緒的待處理任務數量設為 0
setIOPendingCount(id, 0);
}
}
註:上面程式碼中 io_threads_op
變數是在 handleClientsWithPendingWritesUsingThreads
函數和 handleClientsWithPendingReadsUsingThreads
函數中設置的。
問題:IO 執行緒要處理的客戶端是如何添加到 io_threads_list 數組中的呢?
是在 redisServer 全局變數里,有兩個 List 類型的成員變數:
clients_pending_write
:待寫回數據的客戶端clients_pending_read
:待讀取數據的客戶端
struct redisServer {
...
// 待寫回數據的客戶端
list *clients_pending_write;
// 待讀取數據的客戶端
list *clients_pending_read;
...
}
Redis server 在接收到客戶端請求、返回給客戶端數據的過程中,會根據一定條件,推遲客戶端的讀寫操作
,並分別把待讀寫的客戶端保存到這兩個列表中。之後 Redis server 每次進入事件循環前,都會把列表中的客戶端添加到 io_threads_list 數組中,交給 IO 執行緒處理。
如何推遲客戶端「讀」操作?
處理可讀事件的回調函數是 readQueryFromClient。
void readQueryFromClient(connection *conn) {
// 從 connection 結構中獲取客戶端
client *c = connGetPrivateData(conn);
……
// 是否推遲從客戶端讀取數據(使用多執行緒 IO 時)
if (postponeClientRead(c)) return;
……
}
主要看下 postponeClientRead 函數。
int postponeClientRead(client *c) {
if (server.io_threads_active &&
server.io_threads_do_reads &&
!ProcessingEventsWhileBlocked &&
!(c->flags & (CLIENT_MASTER|CLIENT_SLAVE|CLIENT_PENDING_READ|CLIENT_BLOCKED)))
{
// 客戶端 flag 添加 CLIENT_PENDING_READ 標記,推遲客戶端的讀操作
c->flags |= CLIENT_PENDING_READ;
// 將客戶端添加到 server 的 clients_pending_read 列表中
listAddNodeHead(server.clients_pending_read,c);
return 1;
} else {
return 0;
}
}
if 的判斷條件:是否可以推遲當前客戶端的讀操作;if 塊里的執行邏輯:將客戶端添加到 clients_pending_read 列表中。下面主要看下判斷條件:
server.io_threads_active = 1
:多 IO 執行緒已激活。server.io_threads_do_reads = 1
:多 IO 執行緒可用於處理延遲執行的客戶端讀操作,是在 Redis 配置文件 redis.conf 中,通過配置項 。io-threads-do-reads 設置的,默認值為 no。ProcessingEventsWhileBlocked = 0
:ProcessingEventsWhileBlocked 函數沒有在執行,當 Redis 在讀取 RDB 文件或 AOF 文件時,會調用這個函數,用來處理事件驅動框架捕獲到的事件,避免因讀取 RDB 或 AOF 文件造成 Redis 阻塞。- 客戶端現有標識不能有
CLIENT_MASTER
、CLIENT_SLAVE
和CLIENT_PENDING_READ
- CLIENT_MASTER:客戶端用於主從複製
- CLIENT_SLAVE:客戶端用於主從複製
- CLIENT_PENDING_READ:客戶端本來就被設置為推遲讀操作
如何推遲客戶端「寫」操作?
Redis 在執行了客戶端命令,要給客戶端返回結果時,會調用 addReply
函數將待返回的結果寫入輸出緩衝區。addReply 函數開始就會調用 prepareClientToWrite 函數。
/* -----------------------------------------------------------------------------
* Higher level functions to queue data on the client output buffer.
* The following functions are the ones that commands implementations will call.
* -------------------------------------------------------------------------- */
/* Add the object 'obj' string representation to the client output buffer. */
void addReply(client *c, robj *obj) {
if (prepareClientToWrite(c) != C_OK) return;
……
}
prepareClientToWrite
函數的注釋如下:
/* This function is called every time we are going to transmit new data
* to the client. The behavior is the following:
*
* If the client should receive new data (normal clients will) the function
* returns C_OK, and make sure to install the write handler in our event
* loop so that when the socket is writable new data gets written.
*
* If the client should not receive new data, because it is a fake client
* (used to load AOF in memory), a master or because the setup of the write
* handler failed, the function returns C_ERR.
*
* The function may return C_OK without actually installing the write
* event handler in the following cases:
*
* 1) The event handler should already be installed since the output buffer
* already contains something.
* 2) The client is a slave but not yet online, so we want to just accumulate
* writes in the buffer but not actually sending them yet.
*
* Typically gets called every time a reply is built, before adding more
* data to the clients output buffers. If the function returns C_ERR no
* data should be appended to the output buffers. */
int prepareClientToWrite(client *c) {
……
// 當前客戶端沒有待寫回數據 && flag 不包含 CLIENT_PENDING_READ
if (!clientHasPendingReplies(c) && !(c->flags & CLIENT_PENDING_READ))
clientInstallWriteHandler(c);
return C_OK;
}
clientInstallWriteHandler 如下,if 判斷條件就不贅述了。
void clientInstallWriteHandler(client *c) {
if (!(c->flags & CLIENT_PENDING_WRITE) &&
(c->replstate == REPL_STATE_NONE ||
(c->replstate == SLAVE_STATE_ONLINE && !c->repl_put_online_on_ack)))
{
// 將客戶端的標識設置為 CLIENT_PENDING_WRITE(待寫回)
c->flags |= CLIENT_PENDING_WRITE;
// 將 client 加入 server 的 clients_pending_write 列表
listAddNodeHead(server.clients_pending_write,c);
}
}
上面介紹如如何推遲客戶端的讀操作、寫操作,那 Redis 是如何將推遲讀寫操作的客戶端,分配給多 IO 執行緒執行的呢?是通過:
handleClientsWithPendingReadsUsingThreads 函數
:將 clients_pending_read 列表中的客戶端分配給 IO 執行緒handleClientsWithPendingWritesUsingThreads 函數
:將 clients_pending_write 列表中的客戶端分配給 IO 執行緒
如何把待「讀」客戶端分配給 IO 執行緒執行?
beforeSleep 函數中調用了 handleClientsWithPendingReadsUsingThreads 函數:
/* We should handle pending reads clients ASAP after event loop. */
handleClientsWithPendingReadsUsingThreads();
handleClientsWithPendingReadsUsingThreads
函數如下,邏輯都在注釋中:
/* When threaded I/O is also enabled for the reading + parsing side, the
* readable handler will just put normal clients into a queue of clients to
* process (instead of serving them synchronously). This function runs
* the queue using the I/O threads, and process them in order to accumulate
* the reads in the buffers, and also parse the first command available
* rendering it in the client structures. */
int handleClientsWithPendingReadsUsingThreads(void) {
// 判斷 io_threads_active 是否被激活,io_threads_do_reads 是否可以用 IO 執行緒處理待讀客戶端
if (!server.io_threads_active || !server.io_threads_do_reads) return 0;
// 判斷 clients_pending_read 長度
int processed = listLength(server.clients_pending_read);
if (processed == 0) return 0;
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
// 獲取 clients_pending_read 的客戶端列表
listRewind(server.clients_pending_read,&li);
// 輪詢方式,將客戶端分配給 IO 執行緒
int item_id = 0;
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 將 IO 執行緒的操作標識設置為「讀操作」
io_threads_op = IO_THREADS_OP_READ;
for (int j = 1; j < server.io_threads_num; j++) {
// 每個執行緒等待處理的客戶端數量 → io_threads_pending 數組
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
// 處理 0 號執行緒(主執行緒)的待讀客戶端
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
readQueryFromClient(c->conn);
}
// 清空 0 號列表
listEmpty(io_threads_list[0]);
// 循環,等待其他所有 IO 執行緒的待讀客戶端都處理完
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
/* Run the list of clients again to process the new buffers. */
// 取出 clients_pending_read 列表
while(listLength(server.clients_pending_read)) {
ln = listFirst(server.clients_pending_read);
client *c = listNodeValue(ln);
// 判斷客戶端標識符是否有 CLIENT_PENDING_READ,有則表示被 IO 執行緒解析過
c->flags &= ~CLIENT_PENDING_READ;
// 將客戶端從 clients_pending_read 列表中刪掉
listDelNode(server.clients_pending_read,ln);
serverAssert(!(c->flags & CLIENT_BLOCKED));
if (processPendingCommandsAndResetClient(c) == C_ERR) {
/* If the client is no longer valid, we avoid
* processing the client later. So we just go
* to the next. */
continue;
}
// 解析並執行客戶端的所有命令
processInputBuffer(c);
/* We may have pending replies if a thread readQueryFromClient() produced
* replies and did not install a write handler (it can't).
*/
if (!(c->flags & CLIENT_PENDING_WRITE) && clientHasPendingReplies(c))
clientInstallWriteHandler(c);
}
/* Update processed count on server */
server.stat_io_reads_processed += processed;
return processed;
}
如何把待「寫」客戶端分配給 IO 執行緒執行?
待寫客戶端的分配處理是由 handleClientsWithPendingWritesUsingThreads
函數完成的,該函數也是在 beforeSleep
函數中調用的。邏輯和 handleClientsWithPendingReadsUsingThreads 函數很像。
int handleClientsWithPendingWritesUsingThreads(void) {
// 判斷 clients_pending_write 列表的數量
int processed = listLength(server.clients_pending_write);
if (processed == 0) return 0;
// 只有主 IO 執行緒 || 不使用 IO 執行緒
if (server.io_threads_num == 1 || stopThreadedIOIfNeeded()) {
return handleClientsWithPendingWrites();
}
/* Start threads if needed. */
if (!server.io_threads_active) startThreadedIO();
/* Distribute the clients across N different lists. */
listIter li;
listNode *ln;
listRewind(server.clients_pending_write,&li);
int item_id = 0;
// 把待寫客戶端,按照輪詢方式分配給 IO 執行緒
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
c->flags &= ~CLIENT_PENDING_WRITE;
if (c->flags & CLIENT_CLOSE_ASAP) {
listDelNode(server.clients_pending_write, ln);
continue;
}
int target_id = item_id % server.io_threads_num;
listAddNodeTail(io_threads_list[target_id],c);
item_id++;
}
// 將 IO 執行緒的操作標識設置為「寫操作」
io_threads_op = IO_THREADS_OP_WRITE;
for (int j = 1; j < server.io_threads_num; j++) {
// 每個執行緒等待處理的客戶端數量 → io_threads_pending 數組
int count = listLength(io_threads_list[j]);
setIOPendingCount(j, count);
}
/* Also use the main thread to process a slice of clients. */
listRewind(io_threads_list[0],&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
writeToClient(c,0);
}
listEmpty(io_threads_list[0]);
// 循環,等待其他所有 IO 執行緒的待寫客戶端都處理完
while(1) {
unsigned long pending = 0;
for (int j = 1; j < server.io_threads_num; j++)
pending += getIOPendingCount(j);
if (pending == 0) break;
}
/* Run the list of clients again to install the write handler where
* needed. */
listRewind(server.clients_pending_write,&li);
while((ln = listNext(&li))) {
client *c = listNodeValue(ln);
// 再次檢查是否有待寫客戶端
if (clientHasPendingReplies(c) &&
connSetWriteHandler(c->conn, sendReplyToClient) == AE_ERR)
{
freeClientAsync(c);
}
}
listEmpty(server.clients_pending_write);
/* Update processed count on server */
server.stat_io_writes_processed += processed;
return processed;
}
需要注意的是,stopThreadedIOIfNeeded
函數中會判斷待寫入的客戶端數量如果 < IO 執行緒數 * 2
,則也會直接返回,直接使用主 IO 執行緒處理待寫客戶端。這是因為待寫客戶端不多時,使用多執行緒效率反而會下降。
int stopThreadedIOIfNeeded(void) {
int pending = listLength(server.clients_pending_write);
/* Return ASAP if IO threads are disabled (single threaded mode). */
if (server.io_threads_num == 1) return 1;
if (pending < (server.io_threads_num*2)) {
if (server.io_threads_active) stopThreadedIO();
return 1;
} else {
return 0;
}
}
總結
Redis 6.0 實現的多 IO 執行緒機制
,主要是使用多個 IO 執行緒,並發處理客戶端讀取數據
、解析命令
、寫回數據
,充分利用伺服器的多核
特性,提高 IO 效率。
Redis server 會根據 readQueryFromClient
函數調用 postponeClientRead 函數決定是否要推遲客戶端操作;會根據 addReply
函數中的 prepareClientToWrite 函數,決定是否推遲客戶端的寫操作。待讀客戶端加入到 clients_pending_read 列表,待寫客戶端加入 clients_pending_write 列表。
IO 執行緒創建之後,會一直檢測 io_threads_list
列表,如果有待讀寫的客戶端,IO 執行緒就會調用 readQueryFromClient 或 writeToClient 函數進行處理。
但是多 IO 執行緒並不會執行命令,執行命令
仍然在主 IO 執行緒
。
參考鏈接
- 極客時間:12 | Redis 真的是單執行緒嗎?
- 極客時間:13 | Redis 6.0 多 IO 執行緒的效率提高了嗎?
- pthread_create(3) — Linux manual page。
Redis 源碼簡潔剖析系列
Java 編程思想-最全思維導圖-GitHub 下載鏈接,需要的小夥伴可以自取~
原創不易,希望大家轉載時請先聯繫我,並標註原文鏈接。