曹工說Redis源碼(6)– redis server 主循環大體流程解析
文章導航
Redis源碼系列的初衷,是幫助我們更好地理解Redis,更懂Redis,而怎麼才能懂,光看是不夠的,建議跟着下面的這一篇,把環境搭建起來,後續可以自己閱讀源碼,或者跟着我這邊一起閱讀。由於我用c也是好幾年以前了,些許錯誤在所難免,希望讀者能不吝指出。
曹工說Redis源碼(1)– redis debug環境搭建,使用clion,達到和調試java一樣的效果
曹工說Redis源碼(2)– redis server 啟動過程解析及簡單c語言基礎知識補充
曹工說Redis源碼(3)– redis server 啟動過程完整解析(中)
曹工說Redis源碼(4)– 通過redis server源碼來理解 listen 函數中的 backlog 參數
曹工說Redis源碼(5)– redis server 啟動過程解析,以及EventLoop每次處理事件前的前置工作解析(下)
本講主題
先給大家複習下前面一講的功課,大家知道,redis 基本是單線程,也就是說,假設我們啟動main方法的,是線程A,那麼,最終,去處理客戶端socket連接、讀取客戶端請求、以及向客戶端socket寫數據,也還是線程A。
同時,大家想必也知道,redis 里還是有一些後台任務要做的,比如:
-
字典的rehash(rehash的意思是,redis 里,字典結構,其實是包含了兩個hashtable,一般使用第一個;當需要擴充其size的時候,hashtable[1] 就會擴充內存到擴充後的size,然後,就需要把hashtable[0]裏面的數據,全部遷移到 hashtable[1] 來,這個過程,即所謂的rehash),rehash的過程,還是比較耗時的;
-
redis 里的鍵,如果設了過期時間,到了過期時間後,這個key,是不是就在redis里不存在了呢?不一定,但是你去訪問的時候,肯定是看不到了。但這個怎麼做到的呢?難道每次來一個這種key,就設置一個timer,在指定過期時間後執行清除任務嗎?這個想來,開銷太大了;
所以,其實分了兩種策略:
- 一是redis 給自己開了個周期性的定時任務,就是那種,每隔30s執行一次之類的,在這個任務中,就會去主動檢查:設置了過期時間的key的集合,如果發現某個key過期了,直接刪除;但是,redis由於其單線程特性,如果遇到過期key特別多的話,就要一直忙着清理過期key了,正事就沒法幹了(比如處理客戶端請求),所以,每次redis執行這種任務的時候,基本就是敷衍了事,得過且過,隨機選幾個鍵,刪了就算完事。
- 二是,redis在你真正去get 這個key的時候,才去檢查是否過期,如果發現過期了,再刪除。這是什麼策略?就是懶。所以叫惰性刪除。
-
檢查當前的客戶端集合,看看哪些是一直空閑,且超過了一定時間的,這部分客戶端,被篩選出來,直接幹掉,關掉與該客戶端之間的長連接。
-
還有其他一些任務,下邊再說。
所以,從上面可知,redis 主要要干兩類活,一種是客戶端要它乾的,比如,我執行個get/set命令,這個優先級比較高;另一類就是例行工作,每隔多久就得干一次。
前面一講,我們已經講到了下面這個代碼:
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
// 如果有需要在事件處理前執行的函數,那麼運行它
if (eventLoop->beforesleep != NULL)
// 1
eventLoop->beforesleep(eventLoop);
// 2 開始處理事件
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
1處,我們已經講完了;本講,主要講2處,這個主循環。ok,扯了一堆,let’s go!
主循環大體流程
獲取:當前還有多長時間,到達周期任務的時間點
獲取有沒有周期任務要執行,如果有,則計算一下,要過多久,才到周期任務的執行時間;把過多久這個時間,算出來後,定義為 timeLeftToScheduledJobTime;如果沒有周期任務,這個時間可以定義為null;
如果發現時間已經到了,則表示現在就可以執行這個周期任務了,把timeLeftToScheduledJobTime 設為0
這部分代碼,如下所示:
if (eventLoop->maxfd != -1 ||
((flags & AE_TIME_EVENTS) && !(flags & AE_DONT_WAIT))) {
int j;
aeTimeEvent *shortest = NULL;
// 1
struct timeval tv, *tvp;
// 獲取最近的時間事件
if (flags & AE_TIME_EVENTS && !(flags & AE_DONT_WAIT))
// 2
shortest = aeSearchNearestTimer(eventLoop);
if (shortest) {
// 如果時間事件存在的話
// 那麼根據最近可執行時間事件和現在時間的時間差來決定文件事件的阻塞時間
long now_sec, now_ms;
// 計算距今最近的時間事件還要多久才能達到
// 並將該時間距保存在 tv 結構中
/**
* 3 獲取當前時間,這裡把兩個long 局部變量的地址傳進去了,在裏面會去修改它
*/
aeGetTime(&now_sec, &now_ms);
tvp = &tv;
// 4
tvp->tv_sec = shortest->when_sec - now_sec;
if (shortest->when_ms < now_ms) {
tvp->tv_usec = ((shortest->when_ms + 1000) - now_ms) * 1000;
tvp->tv_sec--;
} else {
tvp->tv_usec = (shortest->when_ms - now_ms) * 1000;
}
// 5 時間差小於 0 ,說明事件已經可以執行了,將秒和毫秒設為 0 (不阻塞)
if (tvp->tv_sec < 0) tvp->tv_sec = 0;
if (tvp->tv_usec < 0) tvp->tv_usec = 0;
} else {
// 執行到這一步,說明沒有時間事件
if (flags & AE_DONT_WAIT) {
// 6
tv.tv_sec = tv.tv_usec = 0;
tvp = &tv;
} else {
/* Otherwise we can block */
// 7
tvp = NULL; /* wait forever */
}
}
- 1處,定義了一個變量tvp,基本用來存儲前面我們說的那個
timeLeftToScheduledJobTime
- 2處,會獲取最近的周期任務的時間
- 3處,獲取當前時間,保存到
long now_sec, now_ms
- 4處,最近的周期任務的時間,減去當前時間,差值保存到
tvp->tv_sec
- 5處,如果最終算出來,時間差為負數,則設為0,表示,這個周期任務現在就可以運行
- 6處和7處,這是另外一個else分支,從2處分出來的,如果沒找到最近的周期任務,則進入這裡;根據參數flags中是否設置了
AE_DONT_WAIT
選項,分出2個分支,一個設為0,一個設為null。
select函數簡介
說到網絡編程中的多路復用,select幾乎是繞不開的話題,在沒有epoll之前,基本就是使用select。當然,select有它的缺點,那就是:
- select總是去線性掃描所有的文件描述符,看看哪個文件描述符是ready的;怎麼算作ready,讀或者寫,不用阻塞,就算是ready;
- select最大支持的文件描述符數量有限制,默認為1024.
下面,大家看看select的api,大家也可以自行在linux機器上執行:man select 查看。
select, pselect, FD_CLR, FD_ISSET, FD_SET, FD_ZERO - synchronous I/O multiplexing
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
上面的第一行,是select的簡單說明,其中一個詞是,synchronous,同步的意思,說明select是同步的,不是異步的,只是進行了io多路復用。
下面那個是其api,簡單解釋三個參數:
-
參數fd_set *readfds
Those listed in readfds will be watched to see if characters become available for reading (more
precisely, to see if a read will not block也就是說,這個集合中的fd,會被監測,看看哪些fd可以無阻塞地讀取;怎麼才能無阻塞地讀取,那肯定是這個fd的輸入緩衝區有內容啊,比如,客戶端發了數據過來
-
參數fd_set *writefds
those in writefds will be watched to see if a write will not block
這個集合,會被監測,看看是否可以對這個fd,進行無阻塞地寫;什麼時候,不能無阻塞地寫呢?肯定是緩衝區滿了的時候。這種應該常見於:給對端發數據時,對方一直不ack這些數據,所以我方的緩衝區里,一直不能刪這些數據,導致緩衝區滿。
-
struct timeval *timeout
The timeout argument specifies the minimum interval that select() should block waiting for a file descriptor to become ready. If both fields of the timeval structure are zero, then select() returns immediately. (This is useful for polling.) If timeout is NULL (no timeout), select() can block
indefinitely.這個timeout參數,指定了select()操作,等待文件描述符變成ready過程中,需要等待多長時間。如果這個timeout的兩個字段,都被設為了0,則select()會馬上返回。如果timeout是null,這個操作會無限阻塞。
所以,select我就算大家了解了,其中的timeout參數,簡單來說,就是調用select時,最大阻塞多久就要返回。
如果設為0,則馬上返回;如果為null,則無限阻塞;如果為正常的大於0的值,則阻塞對應的時長。
和前面的部分,聯繫起來,就是說:
- 假設沒有周期任務,則,無限阻塞;
- 如果有周期任務,且時間已經到達,則馬上返回;
- 如果有周期任務,且時間未到,則阻塞對應時長後返回。
linux下不是用epoll嗎,為啥還講select
有的函數,天生適合拿來講課。epoll,kqueue等,會單獨拿來講。
獲取到ready的文件描述符後,處理該文件描述符
// 1 處理文件事件,阻塞時間由 tvp 決定,tvp:timevalue pointer
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
// 2 從已就緒數組中獲取事件
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int rfired = 0;
// 讀事件
if (fe->mask & mask & AE_READABLE) {
// rfired 確保讀/寫事件只能執行其中一個
rfired = 1;
fe->rfileProc(eventLoop, fd, fe->clientData, mask);
}
// 寫事件
if (fe->mask & mask & AE_WRITABLE) {
if (!rfired || fe->wfileProc != fe->rfileProc)
fe->wfileProc(eventLoop, fd, fe->clientData, mask);
}
processed++;
}
-
1處,這裡就會根據當前的操作系統,決定調用select或是epoll,或是其他的實現。(通過條件編譯實現)。
假設這裡的底層實現,就是前面講的select函數,那麼,select函數執行完後,eventLoop->fired 屬性,就會存放這次select篩選出來的那些,ready的文件描述符集合。
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) { aeApiState *state = eventLoop->apidata; int retval, j, numevents = 0; /** * 拷貝到帶_的變量中 */ memcpy(&state->_rfds,&state->rfds,sizeof(fd_set)); memcpy(&state->_wfds,&state->wfds,sizeof(fd_set)); // 1 retval = select(eventLoop->maxfd+1, &state->_rfds,&state->_wfds,NULL,tvp); if (retval > 0) { for (j = 0; j <= eventLoop->maxfd; j++) { int mask = 0; aeFileEvent *fe = &eventLoop->events[j]; if (fe->mask == AE_NONE) continue; if (fe->mask & AE_READABLE && FD_ISSET(j,&state->_rfds)) mask |= AE_READABLE; if (fe->mask & AE_WRITABLE && FD_ISSET(j,&state->_wfds)) mask |= AE_WRITABLE; // 2 eventLoop->fired[numevents].fd = j; eventLoop->fired[numevents].mask = mask; numevents++; } } return numevents; }
如上所示,1處,調用select;2處,賦值給fired。
-
2處,從fired中取出對應的文件描述符
-
3處,如果fired中的文件描述符,可讀,則執行對應的函數指針rfileProc指向的函數
-
4處,如果fired中的文件描述符,可寫,則執行對應的函數指針wfileProc指向的函數
如果有周期任務,則執行周期任務
/* Check time events */
// 執行時間事件
if (flags & AE_TIME_EVENTS)
processed += processTimeEvents(eventLoop);
這裡會調用processTimeEvents,其實現如下,其中涉及到複雜的時間計算,我們可以只看核心流程:
/* Process time events
*
* 處理所有已到達的時間事件
*/
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te;
long long maxId;
time_t now = time(NULL);
// 更新最後一次處理時間事件的時間
eventLoop->lastTime = now;
// 遍歷鏈表
// 執行那些已經到達的事件
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId - 1;
while (te) {
long now_sec, now_ms;
long long id;
// 獲取當前時間
aeGetTime(&now_sec, &now_ms);
// 如果當前時間等於或等於事件的執行時間,那麼說明事件已到達,執行這個事件
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms)) {
int retval;
id = te->id;
//1 執行事件處理器,並獲取返回值
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
// 記錄是否有需要循環執行這個事件時間
if (retval != AE_NOMORE) {
// 2 是的, retval 毫秒之後繼續執行這個時間事件
aeAddMillisecondsToNow(retval, &te->when_sec, &te->when_ms);
} else {
// 不,將這個事件刪除
aeDeleteTimeEvent(eventLoop, id);
}
// 因為執行事件之後,事件列表可能已經被改變了
// 因此需要將 te 放回表頭,繼續開始執行事件
te = eventLoop->timeEventHead;
} else {
te = te->next;
}
}
return processed;
}
-
1處,執行timeProc這個函數指針,執行的函數,在初始化的時候,這個指針,被賦值為serverCron;
初始化時,會調用一下代碼:
// 為 serverCron() 創建時間事件 if (aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) { redisPanic("Can't create the serverCron time event."); exit(1); }
這裡的serverCron,是一個函數指針。
long long aeCreateTimeEvent(aeEventLoop *eventLoop, long long milliseconds, aeTimeProc *proc, void *clientData, aeEventFinalizerProc *finalizerProc) { // 更新時間計數器 long long id = eventLoop->timeEventNextId++; // 創建時間事件結構 aeTimeEvent *te; te = zmalloc(sizeof(*te)); if (te == NULL) return AE_ERR; // 設置 ID te->id = id; // 設定處理事件的時間 aeAddMillisecondsToNow(milliseconds, &te->when_sec, &te->when_ms); // 1 設置事件處理器 te->timeProc = proc; te->finalizerProc = finalizerProc; // 設置私有數據 te->clientData = clientData; // 將新事件放入表頭 te->next = eventLoop->timeEventHead; eventLoop->timeEventHead = te; return id; }
上面的1處,將傳入的serverCron,賦值給了te->timeProc。
-
2處,註冊下一次的周期任務
總結
本講主要講解了主循環的最外層結構,如果有什麼不清楚的,可以留言。