作業系統學習筆記-6:進程同步與進程互斥(三):經典問題

  • 2020 年 3 月 30 日
  • 筆記

在上一篇筆記中,我們介紹到了通過訊號量機制解決進程同步和進程互斥問題的原理,不過,在遇到實際問題的時候,訊號量機制到底是如何發揮作用的呢?這篇筆記將會從幾個經典的問題出發,在解決問題的過程中,我們會體會到訊號量機制的運用。

1 生產者 — 消費者問題

生產者 — 消費者問題描述的是一個對產品生產和消費的過程:首先,對於生產者,如果緩衝區沒滿,它就會生產產品並將其放入緩衝區,如果緩衝區已滿,它就會進行等待;對於消費者,如果緩衝區不空,它就會取走產品並釋放緩衝區,如果緩衝區已空,它就會進行等待。

對於這個問題,不難看出有兩個進程,一個是生產者進程,一個是消費者進程;同時有一個互斥關係,在同一時間內,只能有一個進程訪問同一個緩衝區,要麼放入產品,要麼取走產品;同時有兩個同步關係,一個指的是:必定是先生產產品,後取走產品,另一個指的是:必定是先釋放緩衝區,後使用緩衝區。

因此,我們在這裡需要準備兩個進程,一個是表示生產者進程的 producer,一個是表示消費者進程的 consumer;同時,我們需要準備三個訊號量。第一個訊號量是互斥訊號量,實現對緩衝區這個資源的互斥訪問,用 mutex = 1 表示;第二個訊號量是同步訊號量,表示空閑緩衝區的數量,用 empty = n 表示;第三個訊號量也是同步訊號量,表示非空閑緩衝區的數量,也即產品數量,用 full = 0 表示。

先考慮對互斥關係的實現。這裡所謂的臨界資源其實就是某一個緩衝區,生產者進程把產品放進去,消費者進程從裡面取走產品,這兩者不能同時進行,所以這裡是互斥的關係。我們可以想像到,對每一個進程而言,他都有屬於自己的一對 PV 操作,用以實現對緩衝區資源的訪問。另外,生產者在進行 PV 操作之前,必定要先生產產品;而消費者在進行 PV 操作之後,必定要使用產品。這時候,初步的偽程式碼是這樣的:

producer(){                         consumer(){      while(1){                           while(1){          生產產品                              P(mutex)          P(mutex)                             從緩衝區中取走產品          把產品放入緩衝區                       V(mutex)          V(mutex)                             使用產品      }                                   }  }                                   }

接著考慮第一個同步關係。關注緩衝區,可以知道一定是先釋放緩衝區,後使用緩衝區,所以這裡「前操作」是釋放緩衝區,「後操作」是使用緩衝區,根據上篇筆記所講的 「前VP後」,我們需要在「前操作」之後針對 empty 這個訊號量進行一次 V 操作,需要在「後操作」之前針對 empty 進行一次 P 操作。所以,這時候程式碼變成:

producer(){                         consumer(){      while(1){                           while(1){          生產產品                             P(mutex)          P(empty)                            從緩衝區中取走產品          P(mutex)                            V(mutex)          把產品放入緩衝區                      V(empty)          V(mutex)                            使用產品      }                                   }  }                                   }

再考慮第二個同步關係。關注產品本身,可以知道一定是先生產產品,後使用產品,更進一步地說,一定是先生產產品並將其放入緩衝區,後從緩衝區取出產品並使用。這裡劃分出前後兩個操作,所以再次安排一對 PV 操作。這時候,程式碼變成:

producer(){                         consumer(){      while(1){                           while(1){          生產產品                            P(full)          P(empty)                           P(mutex)          P(mutex)                           從緩衝區中取走產品          把產品放入緩衝區                     V(mutex)          V(mutex)                           V(empty)          V(full)                            使用產品      }                                   }  }                                   }

這個實際上就是最後的程式碼了。現在我們試著跑一下流程:初始的時候 empty = n,表示所有緩衝區都是空閑的,同時 full = 0,表示一個產品都沒生產出來。假如處理機首先來到 consumer 進程,那麼就會通過 P(full) 檢查是否有產品,這裡當然是沒有,所以它只能進行等待;處理機來到 producer,首先通過 P(empty) 檢查是否有空閑緩衝區,這裡當然有,於是它開始把生產的產品放入緩衝區,隨後記錄產品的數量,這個過程可以反覆進行,直到所有緩衝區都被佔用了,此時 producer 就會進入等待狀態,等待 consumer 進程取出產品、釋放緩衝區;當然還有可能的情況是,producer 尚未佔用完所有緩衝區,進程就切換到 consumer 了,那麼這時候 consumer 因為檢查到有產品,所以也會取出產品、釋放緩衝區。

這裡要注意可能會引起「死鎖」現象的一種寫法。如下所示:

producer(){                         consumer(){      while(1){                           while(1){          生產產品                            P(mutex)          P(mutex)                           P(full)          P(empty)                           從緩衝區中取走產品          把產品放入緩衝區                     V(mutex)          V(mutex)                           V(empty)          V(full)                            使用產品      }                                   }  }                                   }

這個寫法的問題在於,還沒有先檢查緩衝區是否沒滿或者是否非空,就強行進行互斥的「上鎖」。假如還是按照前面的流程,一開始處理機在 consumer 這裡,那麼 consumer 實際上在沒有檢查緩衝區是否非空的情況下就直接「上鎖」了,這導致它在 P(full) 這一步的時候被卡住,只能等待,而在之後切換到 producer 的時候,正常情況下他應該生產可供對方消費的產品,但是由於對方已經上了鎖,所以 producer 在 P(mutex) 這一步也被卡住了,只能進行等待。這也就是說,producer 等待 consumer 釋放臨界區,而 consumer 等待 producer 使用緩衝區,雙方陷入循環等待,造成了「死鎖」。

另一種情況,我們也可以設想一開始處理機是在 producer 這裡,那麼是不是不會導致「死鎖」呢?並不是。事實上,按照這種情況正常用完全部緩衝區之後,如果處理機還是在 producer 這裡,那麼 producer 在檢查緩衝區是否已滿之前也會將自己提前上鎖,在 P(empty) 這一步陷入等待,之後即使進程切換到 consumer 這裡,它也會因為對方提前上鎖,而導致自己在 P(mutex) 這一步陷入等待。也就是說,這不過是前面那種」死鎖「現象的翻版。

2. 橘子 — 蘋果問題

橘子 — 蘋果問題其實是生產者 — 消費者問題的一種變形,它有多種類型的生產者、多種類型的消費者,所以也叫多生產者 — 多消費者問題。它描述的是:有一個只能放一種水果的空盤子,父親可以放蘋果在上面,母親可以放橘子在上面,女兒只吃父親的蘋果,而兒子只吃母親的橘子。只有盤子空的時候父母親才放水果上去,只有盤子中的水果符合自己的喜好時,女兒或者兒子才會去拿。

從上面這個過程,我們可以分析一下有哪些變數。首先可以肯定的是,有四個進程,父母親扮演著生產者,女兒兒子扮演著消費者;同時有一個互斥關係,在同一個時間,只能有一個人去「訪問」盤子這個資源,所謂「訪問」,無非是四種情況:放蘋果、放橘子、拿蘋果、拿橘子;同時,有三個同步關係,第一個指的是,必定是先有父親放蘋果,後有女兒拿蘋果。第二個指的是,必定是先有母親放橘子,後有兒子拿橘子。第三個指的是,必定是先有女兒或者兒子拿走水果,後有父親或者母親放上水果。

因此,這裡我們準備四個進程:DadMomSonDaughter;同時,我們需要準備四個訊號量。第一個訊號量是互斥訊號量,實現對盤子這個資源的互斥訪問,用 mutex = 1 表示;第二個訊號量是同步訊號量,表示盤子中蘋果的數量,用 apple = 0 表示;第三個訊號量也是同步訊號量,表示盤子中橘子的數量,用 orange = 0 表示;第四個同樣是同步訊號量,表示盤子中還可以放多少水果,用 plate=1 表示。

接下來,我們仿照生產者—消費者問題,對偽程式碼進行推導。由於四個進程都有對互斥資源的訪問,也即都有一個 PV 操作,然後,對父親而言,在 PV 操作之前自己要先拿出蘋果,母親則是拿出橘子;對女兒而言,在 PV 操作之後要吃掉蘋果,兒子則是吃掉橘子。這時候,初步的偽程式碼是這樣的:

Dad(){                        Mom(){      while(1){                     while(1){          拿出蘋果                       拿出橘子          P(mutex)                      P(mutex)          把蘋果放入盤子                  把橘子放入盤子          V(mutex)                      V(mutex)      }                             }  }                             }  Daughter(){                   Son(){      while(1){                     while(1){          P(mutex)                      P(mutex)          從盤中取出蘋果                 從盤中取出橘子          V(mutex)                      V(mutex)          吃掉蘋果                      吃掉橘子      }                             }  }                             }

再來考慮第一類同步關係,關注盤子,可以看到必定是先讓盤子空出來,後讓人使用盤子,所以這裡前後操作就分清楚了,接下來再根據「前VP後」插入程式碼,此時的偽程式碼是這樣的:

Dad(){                        Mom(){      while(1){                     while(1){          拿出蘋果                       拿出橘子          P(plate)                      P(plate)          P(mutex)                      P(mutex)          把蘋果放入盤子                  把橘子放入盤子          V(mutex)                      V(mutex)      }                             }  }                             }  Daughter(){                   Son(){      while(1){                     while(1){          P(mutex)                     P(mutex)          從盤中取出蘋果                 從盤中取出橘子          V(mutex)                     V(mutex)          V(plate)                     V(plate)          吃掉蘋果                      吃掉橘子      }                             }  }                             }

最後考慮第二類同步關係,這次關注水果,可以看到必定是先拿出水果放到盤子上,後取出盤子上的水果吃掉,根據「前VP後」,最後的偽程式碼如下:

Dad(){                        Mom(){      while(1){                     while(1){          拿出蘋果                       拿出橘子          P(plate)                      P(plate)          P(mutex)                      P(mutex)          把蘋果放入盤子                  把橘子放入盤子          V(mutex)                      V(mutex)          V(apple)                      V(orange)      }                             }  }                             }  Daughter(){                   Son(){      while(1){                     while(1){          P(apple)                     P(orange)          P(mutex)                     P(mutex)          從盤中取出蘋果                 從盤中取出橘子          V(mutex)                     V(mutex)          V(plate)                     V(plate)          吃掉蘋果                      吃掉橘子      }                             }  }                             }

更進一步地,其實我們這裡並不需要互斥變數 mutex,試著把它去掉並跑一下流程:

Dad(){                        Mom(){      while(1){                     while(1){          拿出蘋果                       拿出橘子          P(plate)                      P(plate)          把蘋果放入盤子                  把橘子放入盤子          V(apple)                      V(orange)      }                             }  }                             }  Daughter(){                   Son(){      while(1){                     while(1){          P(apple)                     P(orange)          從盤中取出蘋果                 從盤中取出橘子          V(plate)                     V(plate)          吃掉蘋果                      吃掉橘子      }                             }  }                             }

在一開始,任何進程都可能先來到處理機,如果是兒子或者女兒,則會因為盤子中暫時沒有水果而被堵塞,如果是父母親,則會按照正常流程放水果上去,我們假設是第一種情況。由於兒子和女兒進程相繼被阻塞,所以進程來到父親這,拿出蘋果並放在盤子上。假設這中途因為時間片的緣故被切換到母親進程,母親進程由於盤子已經被使用,所以也進入阻塞隊列。之後又來到父親進程,由於它喚醒了女兒進程,女兒進程直接來到就緒隊列,並在父親進程執行完畢之後執行,吃掉蘋果。一吃掉蘋果,盤子就空了,喚醒了母親進程,母親進程放上橘子(這期間即使其它進程進來,也會被阻塞),喚醒了兒子進程,兒子進程就可以吃掉橘子了。

整個過程可能存在各種進程切換的情況,但是無論哪種情況,都可以保證做到進程同步和進程互斥,並且這是在不藉助互斥訊號量的前提下做到的。基於這個原因,我們在這裡可以不使用互斥訊號量。

3. 吸煙者問題

吸煙者問題其實是生產者 — 消費者問題的另一種變形,它有可以生產多種類型產品的一個生產者,以及多種類型的消費者,所以也叫單生產者 — 多消費者問題。它描述的是:有一個供應者和三個抽煙者,供應者擁有無限的煙草、紙、膠水三種材料,抽煙者1號只有煙草,抽煙者2號只有紙,抽煙者3號只有膠水。每次供應者提供其中兩種材料,其中一個抽煙者拿著這兩種材料與自己的材料結合進行抽煙,抽完再發送訊號給供應者,供應者重新供應材料。整個過程按照三個抽煙者輪流抽煙的順序循環往複。

從上面這個過程,我們可以分析一下有哪些變數。首先可以肯定的是,有四個進程,也就是一個供應者和三個抽煙者;同時有一個互斥關係,在同一個時間,只能有一個人去「訪問」桌子這個資源(假定材料放在桌子上),所謂訪問,無非就是放東西和拿東西兩個動作;同時,有四個同步關係,第一個指的是,必定是先放材料組合一,後有抽煙者1號拿材料。第二個指的是,必定是先放材料組合二,後有抽煙者2號拿材料。第三個指的是,必定是先放材料組合三,後有抽煙者3號拿材料;第四個指的是,必定是某一個抽煙者先發出訊號,後有供應者重新供應材料。

因此,這裡我們準備四個進程:ProviderSmoker1Smoker2Smoker3;同時,我們準備三個同類訊號量,即 offer1offer3,分別表示桌子上某個組合的數量。注意我們這裡不準備互斥訊號量,因為正如之前的問題提到的,這裡即使不引入互斥訊號量,也不會影響到我們對於進程互斥和同步的實現。另外,我們再準備一個訊號量 finish = 0 表示抽煙是否完成。除了這些,由於題目要求三個抽煙者是輪流抽煙的,所以對於供應者來說,它不能只提供單一材料組合,而要根據索取者的身份選擇給定的材料,這裡我們還需要再用一個變數 i 記錄具體是哪個抽煙者。從 0 到 2,分別表示三個抽煙者。而且在每一次給完材料之後,我們還要改變這個變數的值,好讓它指向下一個抽煙者,從而達到「輪流抽煙」的目的。

接下來,我們對偽程式碼進行推導。由於四個進程都有對互斥資源的訪問,也即都有一個 PV 操作,所以初步的偽程式碼是這樣的:

Provider(){           Smoker1(){          Smoker2(){        Smoker3(){      while(1){           while(1){           while(1){         while(1){       P(mutex)            P(mutex)            P(mutex)          P(mutex)       放組合一             拿走組合一           拿走組合二         拿走組合三       放組合二             抽煙,發訊號         抽煙,發訊號        抽煙,發訊號       放組合三             V(mutex)            V(mutex)          V(mutex)     }                    }                    }                 }  }                      }                   }                 }

不過,我們前面說了這裡不需要互斥訊號量,所以把它去掉。並且這裡 Provider 的三個操作肯定是要根據抽煙者身份來決定的,所以我們加個 if 判斷,以及對於 i 值的修改:

Provider(){                     Smoker1(){          Smoker2(){        Smoker3(){      while(1){                      while(1){           while(1){         while(1){        if(i==0){                      拿走組合一           拿走組合二         拿走組合三            放組合一                      抽煙,發訊號         抽煙,發訊號       抽煙,發訊號        } else if(i==1){             }                    }                 }          放組合二                 }                   }                 }        } else if(i==2){          放組合三        }        i = (i+1)%3    }  }

按照「前VP後」的原則,必定是先把某個組合放在桌子上,後有某個抽煙者把組合拿走,所以我們考慮在放置組合的語句後面加上 V(offer),在拿組合的語句前面加上 P(offer);同理,必定是抽煙者先抽完煙發出訊號,後有供應者重新放上材料組合,所以我們考慮在抽完煙的語句後面加上 V(finish),在放置組合的語句前面加上 P(finish),所以此時偽程式碼如下:

Provider(){      while(1){        P(finish)        if(i==0){            放組合一          V(offer1)        } else if(i==1){          放組合二          V(offer2)        } else if(i==2){          放組合三          V(offer3)        }        i = (i+1)%3    }  }  Smoker1(){          Smoker2(){        Smoker3(){    while(1){           while(1){         while(1){      P(offer1)           P(offer2)         P(offer3)      拿走組合一           拿走組合二         拿走組合三      抽煙,發訊號         抽煙,發訊號       抽煙,發訊號      V(finish)           V(finish)         V(finish)    }                    }                 }  }                   }                 }

不過這樣我們會發現,Provider 一上處理機就會被阻塞了,因為馬上就遇到了 P(finish),而此時根本還沒有人拿到煙,更不可能抽完煙。事實上,我們想要的效果只是「抽煙者先抽完煙發出訊號,供應者後重新供應」,也就是說,在抽煙者發出訊號之前,我們要阻止 Provider 到達「重新放置組合」這一步,我們下意識考慮到的是直接在放置組合語句前加上限制條件,但這樣會造成阻塞,所以放棄這個方案。那麼何不把限制條件放在放置組合語句後面呢?while 本來就是個首尾連接的循環,放置組合語句的尾部其實就是語句的首部,如果我們在尾部給予了限制,而限制條件滿足了,那麼照樣可以實現「阻止進程 Provider 到達放置組合語句」這個效果。如下所示:

Provider(){      while(1){        if(i==0){            放組合一          V(offer1)        } else if(i==1){          放組合二          V(offer2)        } else if(i==2){          放組合三          V(offer3)        }        i = (i+1)%3        P(finish)    }  }  Smoker1(){          Smoker2(){        Smoker3(){    while(1){           while(1){         while(1){      P(offer1)           P(offer2)         P(offer3)      拿走組合一           拿走組合二         拿走組合三      抽煙,發訊號         抽煙,發訊號       抽煙,發訊號      V(finish)           V(finish)         V(finish)    }                    }                 }  }                   }                 }

可以看到,這裡一定會是抽煙者抽完煙發出訊號之後,Provider 才有辦法重新放置組合。因為但凡 Provider 想要搶先一步重新放置組合,它都要經過底部 P(finish) 的檢查,而這個檢查在 Smoker 發出訊號之前,不可能輕易通過,因此 Provider 如果想要「強闖」,只會導致自己進入阻塞隊列,之後進程切換到 Smoker,Smoker 發出訊號,V(finish) 將 Provider 喚醒,Provider 才能重新放置組合,由此就實現了「先發出訊號,後重新放置組合」的效果。

4. 讀者 — 寫者問題

讀者 — 寫者問題與我們之前遇到的問題類型不同,它描述的是:有讀者和寫者兩組進程,它們共同訪問同一個文件。對於讀者,它可以與多個讀者共同讀取文件(因為不會修改到文件);對於寫者,它不能與其他任何進程共同訪問文件(如果另一進程是寫,則可能覆蓋同一內容;如果是讀,則可能修改到要讀的內容)。也就是說,這裡的互斥問題是讀寫互斥的問題,但與之前不同的是,除了實現讀寫的互斥,我們還要實現讀讀的「不互斥」。

首先準備一個訊號量 rw = 1 表示當前是否有進程在訪問文件(注意一開始是沒有這樣的進程的,1 表示的不是進程數目,僅僅是使用互斥訊號量時習慣上給定的初始值,這個看 wait 的程式碼就能理解了)。

在不考慮「讀讀不互斥」的情況下,我們的偽程式碼是這樣的:

Writer(){               Reader(){      while(1){             while(1){          P(rw)                P(rw)          寫文件               讀文件          V(rw)                V(rw)      }                     }  }                      }

這個程式碼可以實現讀寫互斥,但顯然無法實現「讀讀不互斥」,因為每個讀進程之間也會受到 rw 的影響,使得最後只能有一個讀進程訪問文件。於是我們考慮從讀進程入手,做一些改進。這裡讀和讀不能同時進行的本質原因在於,所有的讀進程都會經歷「檢查並上鎖」這個步驟,而一個讀進程進入後就會馬上檢查並上鎖,導致另一個也想要進入的讀進程被阻塞,所以我們考慮:能不能不要讓所有的讀進程都經歷「檢查並上鎖」這一步驟?也就是說,某些進程可以跳過 P 操作,直接進入臨界區,這樣一來,這些進程就不存在在 P 操作這裡被阻塞的可能性。

什麼樣的進程可以跳過 P 操作呢?就是中間的那些讀進程。因為一開始肯定要有讀進程上鎖、最後肯定要有讀進程解鎖,所以上鎖和解鎖的任務交付給第一個和最後一個進程,而中間的那些進程來去自如,只需要負責讀文件,不需要參與上鎖和解鎖。為了區分讀進程的次序,我們準備一個 count = 0 的變數,它表示的是當前有多少個讀進程正在訪問文件。然後在讀文件的前後,我們分別對 count 進行加一和減一的操作,每次讀文件開始之前 count 會加一,所以在此之前如果變數為 0 ,說明當前讀進程是第一個讀進程;同理,每次讀文件之後 count 會減一,所以在此之後如果變數為 0 ,說明當前讀進程是最後一個讀進程;

此時偽程式碼如下:

Reader(){      while(1){          if(count==0)              P(rw)          count++          讀文件          count--          if(count==0)              V(rw)      }  }

但是這樣會產生一些問題。比方 1 號讀進程首先進入並上鎖,然後在 P 操作之後、count 加一變成 1 之前,進程切換到 2 號讀進程,那麼 2 號讀進程就會卡在 P 操作這個地方,陷入阻塞,顯然這時候無法實現我們想要的「讀讀不互斥」;又比方說,1 號讀進程在 count 減一變成 0 之後、釋放 rw 之前,進程切換到了 2 號讀進程,那麼 2 號同樣又會被卡在 P 操作這裡。所以我們還要進行改進。

問題其實就出在,對 count 的檢查和賦值不是一個原子操作,這導致的結果是,如果在檢查和賦值之間的空隙,進程發生切換,則必然會使得另一進程陷入阻塞。那麼能不能讓這兩個操作一氣呵成呢?事實上,可以把 count 當作是一個互斥訪問的資源,對 count 的訪問是互斥的,也就說明一個時間段內只能有一個讀進程去訪問它,即使這個過程中切換到了其它進程,那個進程也會被阻塞,從而保證只有一個進程可以訪問 count,而這個訪問就是檢查和賦值,這種情況下,檢查和賦值一定是不會被中斷的。

準備一個互斥訊號量 mutex = 1 表示對 count 的互斥訪問,將檢查和賦值封裝在一個 PV 操作里。偽程式碼如下:

Reader(){      while(1){          P(mutex)          if(count==0)              P(rw)          count++          V(mutex)          讀文件            P(mutex)          count--          if(count==0)              V(rw)          V(mutex)      }  }

現在我們再來跑一下過程。假設還是 1 號讀進程運行到 P 操作的時候,進程切換到了 2 號讀進程,那麼由於互斥訊號量 mutex 的存在,導致 2 號進程進入了 mutex 對應的阻塞隊列 —— 是的,這時候看起來 2 號進程還是被阻塞了,不過我們要關注到的是,阻塞它的訊號量是 mutex,不是 rw。這意味著,在進程重新切換回 1 號進程的時候,1 號進程一旦執行了 V(mutex),就可以將 2 號進程喚醒並送到就緒隊列了。也就是說,儘管 2 號進程還是經歷了「阻塞」這個過程,但是這個過程只是為了確保 1 號進程檢查和上鎖兩個操作的原子性,一旦操作完成,2 號進程馬上就被喚醒了。而之前那種情況不同,之前的情況是,導致 2 號進程被阻塞的是訊號量 rw,除非 1 號進程讀完後釋放,否則 2 號進程會一直處於阻塞狀態。這就是說,2 號進程永遠不可能與 1 號進程同時讀文件,但是改進後是可以的。

但隨之而來的又是另一個問題, 「讀寫不公平」。也就是說,這樣的程式碼本質上是對讀進程更有利的。

因為對讀進程來說,一旦第一個讀進程進來了,中間即使穿插再多的讀進程,也都是允許的,他們根本不受到 rw 這個「鎖」的限制;而對於寫進程,它的運氣就沒這麼好了,寫進程只要想進來,就必須通過 rw 這個「鎖」,而這個「鎖」 實際上又掌握在最後一個讀進程手裡 —— 這就是說,萬一讀進程源源不斷進來,始終輪不到最後一個讀進程解鎖,那麼寫進程就只能陷入無盡的等待了。

既然 rw 這把鎖無法做到公正對待每一個進程,那我們就考慮在外層加一把「更大、更公正的鎖」。也就是說,所有的進程,無論讀還是寫,無一例外必須通過這把「鎖」的檢查。為此,我們準備一個新的互斥訊號量 w = 1,並將 Writer 和 Reader 的一些關鍵操作封裝在 w 的一對 PV 操作里。此時,偽程式碼如下:

我們來跑一下流程。假設首先來到 1 號讀進程,那麼它就會執行 P 操作上鎖,這個過程中即使有寫進程想進來,也會被送到 w 對應的阻塞隊列。在 1 號讀進程執行到 V 操作之後,寫進程才會被喚醒並送到就緒隊列,之後就輪到寫進程執行了,而寫進程雖然通過第一個 P 操作,但是被卡在了第二個 P 操作(讀進程尚未釋放 rw),所以他來到了 rw 對應的阻塞隊列。

注意!重點來了,如果這時候 2 號讀進程也想要訪問文件,那麼在以前,它是不需要通過任何檢查就可以直接來讀文件的,並且直到 2 號讀進程釋放 rw 之後,寫進程才能真正來執行寫文件的操作。但是現在由於我們加了一把「更大的鎖」,導致 2 號進程也必須先通過 w 的檢查,而由於寫進程搶先在他之前上了鎖,所以 2 號讀進程被送到了 w 對應的阻塞隊列。也就是說,現在的情況是:寫進程等著 1 號讀進程釋放 rw,而 2 號讀進程等著寫進程釋放 w,1 號讀進程是讓一切正常進行下去的關鍵。在處理機又來到 1 號讀進程並執行 V(rw) 之後,寫進程從 rw 的阻塞隊列被喚醒,繼續往下執行寫文件的操作。而在寫進程真正執行完之後,w 才能得到釋放,由此又喚醒了 w 阻塞隊列中的 2 號讀進程,2 號讀進程來到處理機運行。

如果換一種情況,是按照 寫者 — 讀者 — 寫者的順序,那麼由於讀者在第二個寫者之前,所以是讀者作為阻塞隊列隊頭,第二個寫者則次之,在後續執行過程中,根據隊列「先進先出」的原則,也會是讀者先於第二個寫者訪問文件。

也就是說,實際上誰先到、誰就在後續過程中先被執行(而不是像之前那種情況,無論寫進程先還是後,讀進程都可以「無視規則」搶先一步執行)。由此,我們就實現了「讀寫公平」。

5. 哲學家就餐問題

最後我們再來看哲學家就餐問題。這個問題描述的是:一張圓桌,五個哲學家,五支筷子(每兩個哲學家中間會有一支筷子),哲學家要麼思考,要麼吃飯,吃飯前會拿起左右兩邊的筷子,吃飯後會放下筷子。如果筷子被其它哲學家拿走,則自己需要等待。我們的目的很簡單:保證所有哲學家都有機會吃上飯,不能讓一個或者多個哲學家始終無法吃飯

首先,五個哲學家對應了五個進程,然後在同一個時間段內,對於同一支筷子,只能有一個哲學家去使用它,所以筷子是一種臨界資源,我們用互斥訊號量 chopstick = 1 表示。鑒於這裡有五支筷子,所以我們準備一個互斥訊號量數組 chopstick[5] = {1,1,1,1,1}。另外,由於任何一個哲學家都只可能拿離自己最近的左右筷子,所以為了加以區分,我們需要給哲學家和筷子進行編號。對於哲學家,從 0 到 4 進行編號,由於哲學家按照圓桌首尾連接,所以某個哲學家左右兩邊的筷子編號與自己本身的編號相關。以哲學家 i 為例,它左邊的筷子編號是 i。右邊則是 (i+1)%5,如下圖所示:

對每一個哲學家進程來說,它都只能拿起左右兩邊的筷子,並且一定是吃飯前拿筷子,吃飯後放下筷子,所以初步的偽程式碼是這樣的(這裡忽略思考的過程):

Pi(){      while(1){          P(chopstick[i])          P(chopstick[(i+1)%5])          eat()          V(chopstick[i])          V(chopstick[(i+1)%5])      }  }

如果哲學家 0 號拿起左右筷子後,進程切換到哲學家 1 號,那麼哲學家 1 號是會被阻塞的,所以這樣寫可以保證相鄰的哲學家中有一個可以吃飯。但是,如果是拿起左筷子,之後進程切換到 1 號,那麼 1 號也會拿起自己的左筷子,以此類推,直到 4 號也拿起自己的左筷子。接著,進程來到 0 號,這時候,0 號會發現自己已經沒有右筷子可以拿了(作為 1 號的左筷子),於是 0 號被阻塞;同理,1 號也沒有右筷子,所以他也被阻塞……以此類推,由於所有的哲學家都只有左筷子,他們的右筷子都在其他人手裡,這導致了所有的哲學家都被阻塞,等待著另一個哲學家釋放筷子。但實際上,沒有任何哲學家能夠吃飯,因此沒有人可以釋放筷子,這使得這些哲學家都陷入無限的等待中,造成「死鎖」的發生。

解決這個問題有三個方法。

(1)實現原子操作

很容易想到的是,這裡的一個問題在於,拿起左筷子和拿起右筷子並不是一個原子操作,如果在這之間發生了進程切換,那麼就可能會像上面那樣導致「死鎖」的發生。所以我們設想能否將這兩個操作一氣呵成完成。按照前面問題的思路,第一種做法就是準備一個互斥訊號量 mutex = 1 ,並把拿筷子的操作封裝在一個 PV 操作里。程式碼如下:

Pi(){      while(1){          P(mutex)          P(chopstick[i])          P(chopstick[(i+1)%5])          V(mutex)          eat()          V(chopstick[i])          V(chopstick[(i+1)%5])      }  }

這樣一來,在 0 號哲學家拿起左筷子之後,即使發生進程切換,來到 1 號進程,1 號進程也會被卡在 mutex 的 P 操作這裡。被送往阻塞隊列,其它進程也同理。所以最後又來到了 0 號進程,0 號進程順利拿起了右筷子,之後釋放阻塞隊列隊頭的 1 號進程,自己開始吃飯。這種做法保證了有一個哲學家是可以吃飯的,不存在「所有哲學家都無法吃飯」的情況。

另外,這裡涉及到了一個進程需要一次性兩個資源才能完成任務的問題,這時候也可以考慮使用我們之前提到的 AND 訊號量集機制。我們回顧一下,AND 訊號量集機制的 P 操作是一個相對苛刻的操作,要求一個進程要麼拿到全部資源,要麼一個資源也拿不到,所以這裡可以做到的是:對於初始的那個 0 號進程,他在拿筷子的時候要麼左右筷子都拿到,要麼一支筷子都拿不到。由於一開始筷子數量足夠,所以它在一開始就可以一次性拿到左右筷子。同理,在釋放筷子的時候,也是一次性釋放兩支筷子。

程式碼如下:

Pi(){      while(1){          Swait(chopstick[(i+1)%5],chopstick[i]);          eat()          Ssignal(chopstick[(i+1)%5],chopstick[i]);      }  }

因此,我們同樣可以保證有一個哲學家是可以吃飯的,不存在「所有哲學家都無法吃飯」的情況。

(2)只有四個人可以參與這個過程

我們也可以從另一個角度考慮,之前的情況是五個哲學家,五支筷子,所以很容易出現誰也無法吃飯的情況,但是如果我們規定整個過程最多只有四個哲學家參與,那麼即使這四個哲學家每個人都拿走了一支筷子,也還剩下一支筷子可以分配給某個哲學家。換句話說,這樣做,我們同樣可以讓至少一個哲學家吃到飯。

問題在於如何限定「只有最多四個人可以參與這個過程」呢?我們可以準備一個互斥訊號量 count,但是這個訊號量初值不再是 1 ,而是 4,表示最多允許四個哲學家參與過程。當 count 減少到 -1 的時候,就不能再讓哲學家進來了,因此可以保證最多只有四個哲學家。程式碼如下:

Pi(){      while(1){          P(count)          P(chopstick[i])          P(chopstick[(i+1)%5])          eat()          V(chopstick[i])          V(chopstick[(i+1)%5])          V(count)      }  }

我們再來演示前面發生「死鎖」的過程。假如一開始還是從 0 號進程開始,在他拿到左筷子之後,進程切換到 1 號進程,由於 count 數量充足,所以它不會阻塞,而是同樣拿到了左筷子……以此類推,到了 4 號哲學家的時候,由於 count = -1<0,所以它此時是無法進來的。所以有一支筷子在一開始誰也沒拿到,就是 4 號哲學家左邊的筷子。而在稍後進程輪到 3 號哲學家的時候,它是可以拿到這支筷子然後去吃飯的。

這只是其中一種情況,但即便是其它情況,也能保證剩下一支暫時沒被用到的筷子,而這支筷子也一定會在最後被某個進程拿走。因此得以保證總會存在至少一個進程可以吃到飯。

(3)奇數拿左邊,偶數拿右邊

還可以考慮對拿筷子的優先順序進行調整。規定對於奇數哲學家,總是先拿左筷子再拿右筷子;對於偶數哲學家,總是先拿右筷子再拿左筷子。那麼 0 號哲學家和 1 號哲學家就會爭奪 1 號筷子,而 2 號哲學家和 3 號哲學家就會爭奪 3 號筷子。如圖所示:

偽程式碼如下:

Pi(){      while(1)      {          if(i%2 == 0) // 偶數哲學家,先右後左。          {              P(chopstick[(i + 1)%5]) ;              P(chopstick[i]) ;              eat();              V(chopstick[(i + 1)%5]) ;              V(chopstick[i]) ;          }          else   //奇數哲學家,先左後右。          {              P(chopstick[i]) ;              P(chopstick[(i + 1)%5]) ;              eat();              V(chopstick[i]) ;              V(chopstick[(i + 1)%5]) ;          }      }  }

假如一開始還是從 0 號進程開始,很顯然 0 號先拿到 1 號筷子,所以在之後切換進程的時候,1 號進程就會直接被阻塞。接著來到 2 號進程,顯然他先拿到 3 號筷子,所以之後輪到 3 號進程的時候,3 號進程直接被阻塞。現在考慮輪到 4 號進程,它優先拿到右邊的 0 號筷子,之後進程又切換到其它地方。但是,由於先前 3 號進程已經被阻塞,所以在再次輪到 4 號進程的時候,並沒有人和他一起爭奪 4 號筷子,換言之,4 號進程可以拿到 4 號筷子,再加上之前優先拿到的 0 號筷子,4 號進程現在就可以吃飯了。

值得提醒的是,這仍然只是其中一種情況,類似的排列組合有很多,但是無論是哪一種,在一對進程的爭搶中必然有一個進程首先被送到阻塞隊列,「被淘汰出局」,因此這個「被淘汰的」進程很難再去影響其它進程,相當於間接提高了其它進程拿到筷子的可能性。就像我們上面的例子一樣,一下子「淘汰了」 1 號和 3 號兩個進程,並且這兩個進程當時並沒有帶著筷子進入阻塞隊列,所以對於其它進程 2 號、4 號、 0 號來說,再次拿到一支筷子的可能性就大大提高了。所以,我們還是能夠達到最初的目的,也就是至少讓一個哲學家吃上飯。