Openresty Lua協程調度機制

寫在前面

OpenResty(後面簡稱:OR)是一個基於Nginx和Lua的高性能Web平台,它內部集成大量的Lua API以及第三方模塊,可以利用它快速搭建支持高並發、極具動態性和擴展性的Web應用、Web服務或動態網關。

OR最大的特點就是,將Lua協程與Nginx事件驅動模型及非阻塞I/O結合起來。使用戶可以在handler中使用 同步但是依然是非阻塞 的方式編寫其應用代碼,而無需關心底層的協程調度以及與Nginx事件驅動模型的交互。

本文將先從總體上介紹OR的協程調度機制,然後結合源碼以及Lua棧的情況來詳細了解各個部分是如何實現的,包括其異常保護、協程初始化、協程的恢復和執行、協程的掛起、協程的執行結束、協程出錯的情況。

本文主要關注調度函數內部的邏輯,如果想了解外部的調用流程。可以參看Openresty Lua鉤子調用完整流程

註:lua-nginx模塊與stream-lua-nginx模塊的主體部分類似,後者實現相對簡單一點。下面的討論將基於stream-lua模塊。

為了防止歧義,文中用到的一些術語明確一下:

  • 主線程:表示外層調用run_thread()的OS線程
  • 入口線程:每個handler被調用時會創建一個入口線程,用於執行lua代碼
  • 用戶線程:用戶在Lua代碼中通過ngx.thread.spawn()創建的線程
  • 用戶協程:用戶在Lua代碼中通過coroutine.create()創建的協程
  • 協程:泛指所有協程,包括入口線程、用戶線程和用戶協程
  • vm:表示Lua虛擬機
  • L:視出現的上下文,一般表示父協程,在創建入口線程的時候表示Lua VM
  • co:一般表示新創建的協程
  • L棧: |協程表|新協程|頂|:表示Lua棧結構,最右邊是棧頂

關鍵數據結構

在深入了解協程調度機制之前,我們先來認識一下主要的數據結構:

  • 協程上下文:ngx_stream_lua_co_ctx_t
    • 協程內部棧(coctx->co
    • 協程狀態(coctx->co_status
    • 維護協程之間關係的數據(父協程coctx->parent_co_ctx、殭屍子線程coctx->zombie_child_threads
    • 用戶相關數據(coctx->data
    • 在Lua的registry表中對應該線程指針的引用值(co_ref
    • 一些狀態標記(是否是用戶線程is_uthread、是否因創建新線程thread_spawn_yielded被yield)
  • 模塊上下文:ngx_stream_lua_ctx_t
    • ctx->cur_co_ctx(當前調度協程上下文)
    • ctx->co_op(協程是以何種方式YIELD)
  • 核心調度函數:ngx_stream_lua_run_thread()

協程調度

首先你可能很好奇OR為什麼要在C引擎層面自己實現協程的調度?或者說這麼做的好處是什麼?我覺得最主要的原因還是減輕開發者的負擔。

原生Lua coroutine接口

我們知道Lua是個非常輕巧的語言,它不像Go有自己的調度器。Lua原生的對協程的操作無非就是coroutine.resume()coroutine.yield()。這兩者是成對出現的,協程coroutine.yield()之後肯定回到父協程coroutine.resume()的地方,恢復子協程需要顯式再次coroutine.resume()。如果要在Lua代碼層面實現非阻塞I/O,那麼父協程必須處理子協程I/O等待的情況,並在事件發生時恢復子協程的執行。如果需要同時進行多個任務,那麼父協程就需要負責多個協程間的調度。因為協程的拓撲可能是一個複雜的樹狀結構,所以協程的調度管理將變得異常複雜。

OpenResty實現

OR在C引擎層幫我們把這些事情都做了,你無須再關心所有這些,只需專心寫你的業務邏輯。為了支持同步非阻塞的方式編寫應用代碼,OR重寫了coroutine的接口函數,從而接管了協程的調度,並在coroutine基礎上封裝抽象出了thread的概念。無論是coroutine還是thread,I/O等待對於用戶都是透明的,用戶無需關心。兩者的主要區別是,coroutine父子之間的協作度更高,coroutine.yield()coroutine.resume()成對出現。在子協程執行完成(出錯)或者顯式coroutine.yield()之前,父協程一直處於等待狀態。而thread則由調度器進行調度,子thread一旦開始執行就不再受父協程控制了,在需要並發請求時很有用。thread提供了spawn()wait()等接口,spawn()執行參數中指定的函數,直到執行完畢、出錯或者I/O等待時返回。wait()則使父協程可以同步等待子線程執行完畢、獲取結果。

OR在對協程調度上,最核心的改動是其創建新協程時的行為(coroutine.resume(), ngx.thread.spawn())。它不會直接調用lua_resume(),而是先lua_yield()回到主線程,然後由主線程再根據情況lua_resume()下一個協程。Lua代碼域內從來不會直接調用lua_resume(),理解了這一點你就理解了OpenResty協程調度的精髓。

所以OR中協程拓撲是一個單層的結構,它只有一個入口點。這樣使得協程調度更加靈活,I/O事件的觸發時回調函數也更容易實現。

OR調度器根據lua_resume()的返回值,確定協程是掛起了、結束了還是出錯了。因為OR改動了創建新協程時行為,同時又抽象了thread概念,所以如果是協程掛起的情況,還需要知道是什麼原因掛起,以便做相應的不同處理。是繼續調度?還是返回上層?我們前面提到的ctx->co_op便是做這個用途。

協程的調度在核心調度函數ngx_stream_lua_run_thread()中進行,它是創建或恢復協程的唯一入口點。最初是由配置的Lua鉤子調用(圖中ssl_cert_handler()),如果碰到了I/O等待的情況,後續則由對應的事件handler(圖中的sleep_handler()read_handler())再次拉起。run_thread()裏面實現了一個調度循環,循環裏面先從ctx->cur_co_ctx獲取下一個待resume的協程上下文,然後lua_resume()執行或恢復該協程,其返回值LUA_YIELD表示協程掛起,0表示協程執行結束,其餘的表示協程出錯了。其中協程掛起又分為四種不同的情況:即等待I/O、新建thread、coroutine.resume()coroutine.yield()。根據不同的情況,決定是跳到循環前面繼續恢復下一個協程,還是返回上層函數。

下圖是協程調度主要邏輯的示意圖,可以看到在Lua代碼域中無論是新建、掛起或恢復協程,都是先調用lua_yield()回到主線程。I/O操作例如ngx.tcp.receive()如果碰到了I/O等待,會在內部註冊epoll事件(對於sleep的情況是定時器),然後自動lua_yield(),當事件觸發時繼續未完成的I/O操作,完成之後再調用run_thread()恢復之前被掛起的協程。

openresty-lua-coroutine-schedule

異常保護

作為一個調度器,OpenResty扮演者類似操作系統內核的角色,不過它的調度對象是Lua協程。作為一個「內核」,無論其調度對象出了什麼問題,都不應該使這個系統崩潰,而是應該將錯誤信息打印出來。

Openresty內部就做了一個這樣的異常保護,其原理就是用setjmplongjmp包住了run_thread()裏面的整個協程調度邏輯。

/* 首先註冊虛擬機的panic回調 */
lua_atpanic(L, ngx_stream_lua_atpanic);
/* setjmp保存環境 */
NGX_LUA_EXCEPTION_TRY {
    /* 執行調度邏輯 */
} NGX_LUA_EXCEPTION_CATCH {
    /* 出現異常時走到這裡 */
    dd("nginx execution restored");
}

ngx_stream_lua_atpanic()的實現也非常簡單,只是簡單地打印崩潰日誌,然後調用NGX_LUA_EXCEPTION_THROW(1);恢復nginx的執行。

int
ngx_stream_lua_atpanic(lua_State *L)
{
#ifdef NGX_LUA_ABORT_AT_PANIC
    abort();
#else
    u_char                  *s = NULL;
    size_t                   len = 0;

    if (lua_type(L, -1) == LUA_TSTRING) {
        s = (u_char *) lua_tolstring(L, -1, &len);
    }

    if (s == NULL) {
        s = (u_char *) "unknown reason";
        len = sizeof("unknown reason") - 1;
    }

    ngx_log_stderr(0, "lua atpanic: Lua VM crashed, reason: %*s", len, s);
    ngx_quit = 1;

    /*  restore nginx execution */
    NGX_LUA_EXCEPTION_THROW(1);

    /* impossible to reach here */
#endif
}

這幾個宏定義分別如下:

#define NGX_LUA_EXCEPTION_TRY                                                \
    if (setjmp(ngx_stream_lua_exception) == 0)

#define NGX_LUA_EXCEPTION_CATCH                                              \
    else

#define NGX_LUA_EXCEPTION_THROW(x)                                           \
    longjmp(ngx_stream_lua_exception, (x))

協程初始化

鉤子的入口線程

ngx_stream_lua_new_thread()用於創建入口線程

OR中需要在Registry表中存儲每個創建出來的Lua線程的reference,這個存儲協程的表在Registry表中對應的key是全局變量ngx_stream_lua_coroutines_key的地址,因此下面這段代碼就是從Registry表中查詢這個儲存協程的表,返回到棧頂:

/* 返回棧頂元素的索引,等於棧中元素的個數 */
base = lua_gettop(L);
/* 將存儲協程的表對應的key壓棧 */
lua_pushlightuserdata(L, ngx_stream_lua_lightudata_mask(
                      coroutines_key));
/* 將key出棧,獲取Registry表中key對應的元素,然後將結果入棧 */
lua_rawget(L, LUA_REGISTRYINDEX);

接下來創建一個新的協程,同時初始化其全局表:

/* 創建Lua協程,返回的新lua_State跟原有的lua_State共享所有的全局對象(如表),
   但是有一個獨立的執行棧。 協程依賴垃圾回收銷毀 */
/* L棧: |協程表|新協程|頂| */
co = lua_newthread(L);  
/* 創建該協程的全局表,設置_G field為全局表自己 */
/* L棧: |協程表|新協程|協程新的全局表|頂| */
ngx_stream_lua_create_new_globals_table(co, 0, 0); 
/* 再創建一個新表 */
/* L棧: |協程表|新協程|協程新的全局表|新表|頂| */
lua_createtable(co, 0, 1);  
/* 拿到全局表 */
/* L棧: |協程表|新協程|協程新的全局表|新表|舊全局表|頂| */
ngx_stream_lua_get_globals_table(co);   
/* 新表的__index的值為棧頂的值,也即就全局表 */
/* L棧: |協程表|新協程|協程新的全局表|新表|頂| */
lua_setfield(co, -2, "__index");    
/* 新表出棧,將其設為索引-2處即協程新的全局表的元表 */
/* L棧: |協程表|新協程|協程新的全局表|頂| */
lua_setmetatable(co, -2);
/* 設置協程新的全局表到對應索引,其_G field是自己,
   其元表是新表,新表的__index是父協程的全局表 */
/* L棧: |協程表|新協程|頂| */
ngx_stream_lua_set_globals_table(co);

這一塊的邏輯有點繞,我們來稍微理一下,其實就是用新建的全局表替換了舊的全局表,其中新的全局表的_G字段是它自己,新全局表的元表中__index元方法是舊的全局表。

此時的Lua虛擬機棧頂情況如下圖所示:

L->top      |   棧頂    |
L->top - 1  |Lua_State*|    新創建的協程
L->top -2   | Lua Table|    存儲協程引用的表

下面一步就是在Lua虛擬機中為這個新協程創建一個reference:

/* 為棧頂對象(即新協程),創建並返回一個協程表中的引用 */
/* 當前棧: |協程表|頂| */
*ref = luaL_ref(L, -2);
if (*ref == LUA_NOREF) {
    lua_settop(L, base);  /* restore main thread stack */
    return NULL;
}

最後恢復堆棧

/* 設置棧頂索引 */
/* 當前棧: |頂| */
lua_settop(L, base);    
return co;

以上步驟還只是創建了一個什麼都不能做的Lua協程,回到_by_chunk()函數之後還需要把入口函數放入協程中。

/* 將lua虛擬機VM棧上的入口函數閉包移到新創建的協程棧上,
   這樣新協程就有了虛擬機已經解析完畢的代碼了。*/
lua_xmove(L, co, 1);

/* 拿到co全局表,放到棧頂 */
/* 當前棧: |入口closure|全局表|頂| */
ngx_stream_lua_get_globals_table(co);
/* 將全局表設為入口closure的環境表 */
/* 當前棧: |入口closure|頂|*/
lua_setfenv(co, -2);

至此,協程入口函數以及環境表已經設置好。接下來就是讓它能夠運行起來,讓調度器能夠調度它運行:

/* 將nginx請求保存到協程全局表 */
ngx_stream_lua_set_req(co, r);

ctx->cur_co_ctx = &ctx->entry_co_ctx;
ctx->cur_co_ctx->co = co;
ctx->cur_co_ctx->co_ref = co_ref;

接下來就是註冊cleanup鉤子,然後ngx_stream_lua_run_thread()

用戶創建的uthread

用戶線程由ngx.thread.spawn()創建,對應的C實現是ngx_stream_lua_uthread_spawn()。首先它會調ngx_stream_lua_coroutine_create_helper()創建一個新的協程。

創建協程

注意協程都是在worker的虛擬機上創建的(不考慮cache off的情況的話)。但是用戶協程會繼承父協程的全局表,其父子關係由OR進行維護。

/* 獲取虛擬機 */
vm = ngx_stream_lua_get_lua_vm(r, ctx);
/* 創建新協程 */
co = lua_newthread(vm);
/* 然後創建coctx,設置其co、co_status值 */
coctx = ngx_stream_lua_create_co_ctx;
coctx->co = co;
coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;

此時父協程的棧如下:

/* 當前棧: |entry_func|args|頂| */

接下來將父協程的全局表給新創建的協程:

/* make new coroutine share globals of the parent coroutine.
 * NOTE: globals don't have to be separated! */
/* 拷貝父協程的全局表到棧上 */
/* L棧: |entry_func|args|全局表|頂| */
ngx_stream_lua_get_globals_table(L);
/* 將全局表移動到新創建的協程co的棧上 */
/* L棧: |entry_func|args|頂| */
lua_xmove(L, co, 1);
/* 從新協程棧上寫入其的全局表 */
ngx_stream_lua_set_globals_table(co);

/* 將新協程從進程虛擬機,移動到父協程中 */
/* L棧: |entry_func|args|新協程|頂| */
lua_xmove(vm, L, 1);
/* 入口函數拷貝到L棧頂 */
/* L棧: |entry_func|args|新協程|entry_func|頂|*/
lua_pushvalue(L, 1); 
/* 將入口函數從L移到co棧中 */
/* L棧: |entry_func|args|新協程|頂| */
/* co棧: |entry_func|頂|*/
lua_xmove(L, co, 1);  

create_helper函數返回之後,L的棧頂是新協程,co的棧頂是入口函數。

初始化uthread

ngx_stream_lua_coroutine_create_helper返回之後,進行uthread的初始化。

此時,父協程L是這樣的:

  • 棧頂是新創建的協程
  • 然後是參數和入口函數

在此之前,先在registry表中保存一個該協程的ref。(到現在還沒搞明白這個ref是幹嘛用的?除了創建線程和刪除線程,貌似只有檢查線程是否活着的時候會查一下這個ref,只是檢查狀態用coctx->co_status不是也能做到么?8.12更新,之所以要把線程錨定到註冊表上,是為了防止被當成垃圾回收。這也解釋了為什麼只有線程需要錨定到註冊表上,而用戶協程不需要。因為用戶協程肯定由其父協程保留着一個引用。)

/* anchor the newly created coroutine into the Lua registry */
/* 把新創建的協程寫入Lua registry表中 */
/* 將ngx_stream_lua_coroutines_key的地址壓入棧中 */
lua_pushlightuserdata(L, &ngx_stream_lua_coroutines_key);
/* 從registry表中獲取協程表 */
/* L棧: |entry_func|args|新協程|協程表|頂|*/
lua_rawget(L, LUA_REGISTRYINDEX);
/* 將新協程壓棧 */
/* L棧: |entry_func|args|新協程|協程表|新協程|頂|*/
lua_pushvalue(L, -2);
/* -2位置是註冊表,為新協程創建在報表中的索引 */
/* L棧: |entry_func|args|新協程|協程表|頂| */
coctx->co_ref = luaL_ref(L, -2);    // 
/* 彈出協程表 */
/* L棧: |entry_func|args|新協程|頂| */
lua_pop(L, 1);

接下來是初始化運行環境:

此時的,L的棧情況如下:

     |entry_func|參數1|...|參數n|新協程|
         1        2   ...  -2    -1
if (n > 1) {
    /* 由於lua函數壓棧順序是從左到右
     * 因此1就是壓入的第一個參數,而spawn的第一個參數就是入口函數
     * 把棧頂元素(即新協程)移動到1,覆蓋入口函數,入口函數前面已經拷貝到新協程棧上了
     */
    /* L棧: |新協程|args|頂| */
    lua_replace(L, 1);
    /* 將參數移到新協程棧中 */
    /* L棧: |新協程|頂|*/
    /* co棧: |入口函數|args|頂| */
    lua_xmove(L, coctx->co, n - 1);
}

設置狀態,將父協程放入post_thread隊列中,設置協程的父子關係,設置新協程為下一個調度的線程

/* 設置狀態 */
coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
ctx->co_op = NGX_STREAM_LUA_USER_THREAD_RESUME;
ctx->cur_co_ctx->thread_spawn_yielded = 1;

/* 將父協程放入post_thread隊列中 */
ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx)

/* 保存子線程的父協程上下文為當前協程 */
coctx->parent_co_ctx = ctx->cur_co_ctx;
/* 切換當前協程為新創建的協程 */
ctx->cur_co_ctx = coctx;

最後,spawn函數的返回值是新創建的協程

/* 將原協程的執行權切換出去,這裡的參數1表示棧上留了一個值,這裡是指新創建的協程
 * 主線程並不會取這個值,而是等到新線程spawn返回時作為返回值。
 * 此時L棧中是新協程,co棧中是參數和入口函數。
 */
return lua_yield(L, 1);

用戶創建的coroutine

OR替換了原生的coroutine接口,當存在getfenv(0).__ngx_req時(全局環境保存了nginx請求),使用重寫後的coroutine接口函數。

coroutine.create()創建新協程部分跟uthread是一樣的,都是調用ngx_stream_lua_coroutine_create_helper()。Lua函數返回新協程。此時新協程棧中是入口函數。

coroutine.resume()用於開始或恢復新協程,其對應的C函數是ngx_http_lua_coroutine_resume()

/* 首先,獲取到協程 */
/* L棧: |co|參數|,  co棧: |入口函數| */
co = lua_tothread(L, 1);

/* 然後設置狀態和父子關係 */
/* 父協程為normal */
p_coctx->co_status = NGX_HTTP_LUA_CO_NORMAL;

coctx->parent_co_ctx = p_coctx;

dd("set coroutine to running");
/* 子協程為running */
coctx->co_status = NGX_HTTP_LUA_CO_RUNNING;

/* 設置co_op告知主線程yield類型 */
ctx->co_op = NGX_HTTP_LUA_USER_CORO_RESUME;
/* 設置下一個調度協程為新協程 */
ctx->cur_co_ctx = coctx;

接下來,將控制權交還給主協程,並把參數傳給主線程。

/* 此時L棧: |co|參數|, co棧: |入口函數| */
/* lua_gettop(L) - 1表示留在棧中的返回值個數,
 * 由主線程取用之後,在lua_resume新協程時傳遞 */
/* 減一個,表示不傳底下的co */
return lua_yield(L, lua_gettop(L) - 1);

協程執行和恢復

OR中協程的執行和恢復總是由主線程來進行,不管是coroutine.resume()還是ngx.thread.spawn(),都是先lua_yield()回到主線程之後,在主線程中lua_resume()

注意到前面創建階段,thread是lua_yield(L, 1),coroutine是lua_yield(L, lua_gettop(L) - 1)。yield到主線程之後,我們繼續看調度程序的處理。

uthread

先獲取參數個數

/* 因為入口函數和參數已經在新線程棧中了,所以從新協程中獲取參數個數,-1是除掉入口函數 */
nrets = lua_gettop(ctx->cur_co_ctx->co) - 1;    

然後跳到主循環的前面,執行新線程

/* 保存新協程coctx */
orig_coctx = ctx->cur_co_ctx;
/* 執行新線程,其中nrets為參數個數 */
rv = lua_resume(orig_coctx->co, nrets);

lua_resume中就會開始新線程的執行。當新線程執行完畢或因I/O中斷yield之後,會恢復父協程。在恢復父協程之前,先設置參數個數為1,即之前留在棧上的新協程co。恢復父協程之後,ngx.thread.spawn()函數就返回了。

if (ctx->cur_co_ctx->thread_spawn_yielded) {
    ctx->cur_co_ctx->thread_spawn_yielded = 0;
    nrets = 1;
}

coroutine

同樣是先獲取參數個數

/* 獲取父協程 */
old_co = ctx->cur_co_ctx->parent_co_ctx->co;
/* 因為參數還在父協程棧中,所以從父協程棧中獲取參數個數 */
nrets = lua_gettop(old_co);
if (nrets) {
    /* 將參數從父協程移到子協程 */
    lua_xmove(old_co, ctx->cur_co_ctx->co, nrets);
}

此時子協程棧中是參數和入口函數。

然後跳到主循環的前面,執行新協程,跟前面uthread時一樣。

協程掛起

協程的掛起分為兩種情況:

  • 一種是內部在I/O等待時自動掛起,這種情況用戶不用參與,OR會自動將相應的事件及其handler掛到事件驅動上,當事件被喚醒時繼續未完成的I/O操作,完成之後由調度器恢復之前掛起的協程。
  • 另一種是用戶在Lua代碼主動調用coroutine.yield()掛起。此時由調度器根據情況決定執行下一個執行的協程。

顯式主動掛起

我們先來看用戶主動掛起的情況,coroutine.yield()對應的C函數為ngx_stream_lua_coroutine_yield()。我們先來看看它裏面幹了些什麼。

/* 首先修改當前協程的狀態為掛起 */
coctx = ctx->cur_co_ctx;
coctx->co_status = NGX_STREAM_LUA_CO_SUSPENDED;
/* 設置co_op */
ctx->co_op = NGX_STREAM_LUA_USER_CORO_YIELD;

/* 如果不是用戶線程(也即是普通coroutine),且有父協程,
   將其父協程狀態設置為running */
if (!coctx->is_uthread && coctx->parent_co_ctx) {
    coctx->parent_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
}

/* 最後將控制權交還主線程,將所有yield參數傳遞給主線程 */
return lua_yield(L, lua_gettop(L));

回到主線程之後,根據待掛起協程是thread還是corotine進行不同處理。

thread

if (ngx_stream_lua_is_thread(ctx)) {
    /* 丟棄coroutine.yield()的任何參數 */
    lua_settop(ctx->cur_co_ctx->co, 0);
    /* 因為thread由調度器負責調度,所以將當前線程的狀態改為running,為什麼不在前面一起改?*/
    ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
    /* 如果已經有pending的線程,則放到隊列中 */
    if (ctx->posted_threads) {
        ngx_stream_lua_post_thread(r, ctx, ctx->cur_co_ctx);
        ctx->cur_co_ctx = NULL;
        return NULL;
    }
    /* 否則,立即恢複線程 */
}

coroutine

/* 獲取當前棧的高度,也即coroutine.yield()的參數個數 */
nrets = lua_gettop(ctx->cur_co_ctx->co);
/* 設置父協程為下一個調度的協程 */
next_coctx = ctx->cur_co_ctx->parent_co_ctx;
next_co = next_coctx->co;
/* 將參數從子協程棧中移到父協程棧中 */
if (nrets) {
    dd("moving %d return values to next co", nrets);
    lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
#ifdef NGX_LUA_USE_ASSERT
    ctx->cur_co_ctx->co_top -= nrets;
#endif

}
/* 如果不是wrap封裝的,還要加一個true,作為第一個參數 */
if (!ctx->cur_co_ctx->is_wrap) {
    /* prepare return values for coroutine.resume
     * (true plus any retvals)
     */
    lua_pushboolean(next_co, 1);
    /* 插入1的位置,作為第一個參數 */
    lua_insert(next_co, 1);
    nrets++; /* add the true boolean value */
}

ctx->cur_co_ctx = next_coctx;
/* 回到主循環的前面,resume父協程 */
break;

I/O等待場景

I/O等待的場景有很多,不過其背後的原理都差不多:

  • 定義一個事件,設置恢復時的handler及對應協程上下文,然後lua_yield()回到run_thread()
  • 主線程將ctx->cur_co_ctx設為空之後,直接返回NGX_AGAIN,如果有posted_thread會繼續執行,否則將控制權交還給nginx層
  • 後續當事件發生時,繼續未完成的操作,完成之後將保存的協程上下文設為ctx->cur_co_ctx,然後調用ngx_stream_lua_run_thread()恢復協程的執行。

這裡舉兩個典型的例子:

ngx.sleep()

它的C函數實現是ngx_stream_lua_ngx_sleep(),先定義設置好handler和coctx,掛上定時器,然後lua_yield()

    ngx_stream_lua_cleanup_pending_operation(coctx);
    coctx->cleanup = ngx_stream_lua_sleep_cleanup;
    coctx->data = r;

    /* 保存恢復時的handler和協程上下文 */
    coctx->sleep.handler = ngx_stream_lua_sleep_handler;
    coctx->sleep.data = coctx;
    coctx->sleep.log = r->connection->log;

    /* 當delay為0時,放入post_event隊列或添加定時器 */
    if (delay == 0) {
#ifdef HAVE_POSTED_DELAYED_EVENTS_PATCH
        dd("posting 0 sec sleep event to head of delayed queue");

        coctx->sleep.delayed = 1;
        ngx_post_event(&coctx->sleep, &ngx_posted_delayed_events);
#else
        ngx_log_error(NGX_LOG_WARN, r->connection->log, 0, "ngx.sleep(0)"
                      " called without delayed events patch, this will"
                      " hurt performance");
        ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
#endif

    } else {    /* 添加定時器 */
        ngx_add_timer(&coctx->sleep, (ngx_msec_t) delay);
    }
    /* 外層函數*/
    return lua_yield(L, 0);

run_thread()里將當前協程上下文置為NULL,然後返回NGX_AGAIN

by_chunk()里會先檢查有沒有在post隊列里的線程,如果沒有則返回

    rc = ngx_stream_lua_run_thread(L, r, ctx, 0);

    if (rc == NGX_ERROR || rc >= NGX_OK) {
        /* do nothing */

    } else if (rc == NGX_AGAIN) {
        rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 0);

    } else if (rc == NGX_DONE) { /* 這裡DONE的情況只有HTTP子請求的時候會出現 */
        rc = ngx_stream_lua_content_run_posted_threads(L, r, ctx, 1);

    } else {
        rc = NGX_OK;
    }

當定時器超時時,它會執行sleep_handler(),設置ctx->cur_co_ctx然後執行run_thread()恢復協程調度。

ngx.tcp.receive()

其對應的C函數實現是ngx_stream_lua_socket_tcp_receive(),裏面會調ngx_stream_lua_socket_tcp_receive_helper()。碰到讀等待的情況,也是先設置好handler和coctx,然後lua_yield()。我們來看下裏面代碼:

    /* 這裡0表示還未進行協程切換 */
    u->read_waiting = 0;
    u->read_co_ctx = NULL;

    /* 讀取的主要邏輯由此函數處理 */
    rc = ngx_stream_lua_socket_tcp_read(r, u);
    /* 不管是成功、出錯或等待I/O,肯定會返回 */
    if(rc == NGX_ERROR) {
        /*...*/
    }
    if(rc == NGX_OK) {
        /*...*/
    }

    /* rc == NGX_AGAIN */
    /* 如果是等待I/O的情況,設置事件觸發時的handler、當前協程上下文 */
    u->read_event_handler = ngx_stream_lua_socket_read_handler;
    coctx = lctx->cur_co_ctx;

    /* 設置請求的寫事件handler,這個是返回到Lua層前調用的handler */
    r->write_event_handler = ngx_stream_lua_content_wev_handler;

    /* 保存當前協程上下文到u上 */
    u->read_co_ctx = coctx;
    /* 表示是後續是需要協程恢復的 */
    u->read_waiting = 1;
    /* 設置準備返回值的回調 */
    u->read_prepare_retvals = ngx_stream_lua_socket_tcp_receive_retval_handler;

    return lua_yield(L, 0);

回到run_thread(),同樣是將當前協程上下文置為NULL,然後返回NGX_AGAIN

當事件被觸發時,執行前面設置的ngx_stream_lua_socket_read_handler(),裏面又會調用讀取操作核心函數ngx_stream_lua_socket_tcp_read()。如果繼續碰到等待I/O,handler直接結束,等待下一次事件。如果是完成或出錯,會執行如下操作:

/* 恢復該值為0 */
u->read_waiting = 0;
/* 獲取協程上下文 */
coctx = u->read_co_ctx;

/* 設置協程恢復的handler */
ctx->resume_handler = ngx_stream_lua_socket_tcp_read_resume;
/* 設置下一個調度的上下文,為之前調用讀取操作的協程 */
ctx->cur_co_ctx = coctx;

/* 這個handler就是yield之前設置的那個,它裏面調用 ctx->resume_handler */
r->write_event_handler(r);  

r->write_event_handler(r);是返回Lua層前調用的handler,裏面會調用resume_handlerngx_stream_lua_socket_tcp_read_resume()只是封裝了一下,最終都是調用的ngx_stream_lua_socket_tcp_resume_helper(),我們看來下它的代碼:

/* 待恢復協程上下文 */
coctx = ctx->cur_co_ctx;

u = coctx->data;
prepare_retvals = u->read_prepare_retvals;
/* 準備返回值 */
nret = prepare_retvals(r, u, ctx->cur_co_ctx->co);

/* 恢復協程調度,回到Lua層 */
rc = ngx_stream_lua_run_thread(vm, r, ctx, nret);

至於完成的條件,取決與不同的調用方式。如果是讀取固定位元組數的話,會維護一個剩餘待讀取的位元組數u->rest。如果是讀取一行,則讀取到\n就結束。如果是readall,則一直讀到u->eof為止。

協程執行完畢

為了不失完整性,再說一下正常結束和出錯時的情況。正常執行完畢時,會設置協程狀態,然後清理它的殭屍子線程:

/* 將當前協程狀態置為DEAD */
ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_DEAD;
/* 如果子線程有殭屍線程,則清理之 */
if (ctx->cur_co_ctx->zombie_child_threads) {
    ngx_stream_lua_cleanup_zombie_child_uthreads(
        r, L, ctx, ctx->cur_co_ctx);
}

接下來,根據結束的協程的類型不同執行不同的操作:

入口線程

此時直接刪除線程即可,然後根據是否還有用戶線程,選擇返回NGX_AGAINNGX_OK

if (ngx_stream_lua_is_entry_thread(ctx)) {
    /* 將虛擬機棧清空 */
    lua_settop(L, 0);
    /* 刪除當前線程,會從REGISTY表中解引用當前協程的`coctx->co_ref` */
    ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);

    /* 如果還有其他用戶線程,返回NGX_AGAIN */
    if (ctx->uthreads) {
        ctx->cur_co_ctx = NULL;
        return NGX_AGAIN;
    }

    /* all user threads terminated already */
    goto done;      /* 到這就圓滿結束了 return NGX_OK; */
}

用戶線程

此時如果父協程已經死了,處理方式跟入口線程一樣,即刪除線程,然後根據是否還有任何用戶線程或入口線程,選擇返回NGX_AGAINNGX_OK

如果父協程還活着,並且已經在wait它了,直接恢復父協程。否則,加入到父協程的殭屍線程列表中。

if (ctx->cur_co_ctx->is_uthread) {
    /* 清空虛擬機棧 */
    lua_settop(L, 0); 
    /* 獲取父協程 */
    parent_coctx = ctx->cur_co_ctx->parent_co_ctx;
    /* 如果父協程還活着 */
    if (ngx_stream_lua_coroutine_alive(parent_coctx)) {
        /* 並且在wait當前線程,則恢復父協程 */
        if (ctx->cur_co_ctx->waited_by_parent) {
            ngx_stream_lua_probe_info("parent already waiting");
            ctx->cur_co_ctx->waited_by_parent = 0;
            success = 1;
            goto user_co_done;
        }

        /* 否則將當前線程掛到父協程的殭屍子線程中 */
        if (ngx_stream_lua_post_zombie_thread(r, parent_coctx,
                                              ctx->cur_co_ctx)
            != NGX_OK)
        {
            return NGX_ERROR;
        }
        /* 壓入第一個返回值true,以備後續wait時返回 */
        lua_pushboolean(ctx->cur_co_ctx->co, 1);
        lua_insert(ctx->cur_co_ctx->co, 1);
        /* 設置當前線程狀態為ZOMBIE */
        ctx->cur_co_ctx->co_status = NGX_STREAM_LUA_CO_ZOMBIE;
        ctx->cur_co_ctx = NULL;
        return NGX_AGAIN;       /* 返回上層 */
    }
    /* 如果父協程已經死了,直接刪除當前線程
     * 會從REGISTY表中解引用當前協程的`coctx->co_ref` */
    ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
    ctx->uthreads--;
    /* 如果沒有用戶線程了 */
    if (ctx->uthreads == 0) {
        /* 入口線程在活着,返回上層 */
        if (ngx_stream_lua_entry_thread_alive(ctx)) {
            ctx->cur_co_ctx = NULL;
            return NGX_AGAIN;
        }

        /* all threads terminated already */
        goto done;  /* 到這就圓滿結束了 return NGX_OK; */
    }

    /* 如果還有其他用戶線程,返回上層 */
    ctx->cur_co_ctx = NULL;
    return NGX_AGAIN;
}

用戶協程

剩下的就是用戶協程的情況,這個情況跟用戶線程被父協程wait的情況是一樣的。主要是將返回值移動到父協程棧中,然後跳到主循環前面恢復父協程的執行。

success = 1;
/* 獲取返回值個數 */
nrets = lua_gettop(ctx->cur_co_ctx->co);
next_coctx = ctx->cur_co_ctx->parent_co_ctx;
next_co = next_coctx->co;
/* 將返回值移到父協程棧中 */
if (nrets) {
    lua_xmove(ctx->cur_co_ctx->co, next_co, nrets);
}
/* 如果是用戶線程,刪除之 */
if (ctx->cur_co_ctx->is_uthread) {
    ngx_stream_lua_del_thread(r, L, ctx, ctx->cur_co_ctx);
    ctx->uthreads--;
}
/* 除了wrap的用戶協程,加上第一個true的返回值 */
if (!ctx->cur_co_ctx->is_wrap) {
    /* ended successfully, coroutine.resume returns true plus
     * any return values
     */
    lua_pushboolean(next_co, success);
    lua_insert(next_co, 1);
    nrets++;
}

/* 設置父協程的狀態為RUNNING */
ctx->cur_co_ctx = next_coctx;
next_coctx->co_status = NGX_STREAM_LUA_CO_RUNNING;
/* 回到主循環前面,恢復父協程的執行 */
continue;

出錯的情況

大致處理步驟是,恢復cur_co_ctx,獲取虛擬機L棧上錯誤信息,獲取當前協程棧中錯誤信息,後面的操作類似協程執行完畢時,根據不同的情況選擇恢復父協程或者返回上層。

/* 恢復cur_co_ctx */
if (ctx->cur_co_ctx != orig_coctx) {
    ctx->cur_co_ctx = orig_coctx;
}
/* 設置當前協程狀態為DEAD */
ctx->cur_co_ctx->co_status = NGX_HTTP_LUA_CO_DEAD;
/* 獲取錯誤信息 */
if (orig_coctx->is_uthread
    || orig_coctx->is_wrap
    || ngx_http_lua_is_entry_thread(ctx))
{
    ngx_http_lua_thread_traceback(L, orig_coctx->co, orig_coctx);
    trace = lua_tostring(L, -1);

    if (lua_isstring(orig_coctx->co, -1)) {
        msg = lua_tostring(orig_coctx->co, -1);
        dd("user custom error msg: %s", msg);

    } else {
        msg = "unknown reason";
    }
}

用戶線程

跟正常結束的處理一樣,除了第一個返回值是false。

此時如果父協程已經死了,直接刪除線程,然後根據是否還有任何用戶線程或入口線程,選擇返回NGX_AGAINNGX_OK

如果父協程還活着,並且已經在wait它了,直接恢復父協程。否則,加入到父協程的殭屍線程列表中。

入口線程

ngx_stream_lua_request_cleanup()清理當前請求,裏面會清理掉所有的用戶創建的協程,然後清理入口協程自己。最後返回錯誤碼。

用戶協程

如果是wrap的協程,將錯誤傳遞給父協程(就好像是父協程出錯了,然後父協程重新走一遍上面的出錯處理流程)。

如果是普通協程,則恢復父協程的執行,返回false和錯誤信息。

參考資料

本博客已經遷移至CatBro’s Blog,那裡是我自己搭建的個人博客,頁面效果比這邊更好,支持站內搜索,評論回復還支持郵件提醒,歡迎關注。這邊只會在有時間的時候不定期搬運一下。

本篇文章鏈接