xv6學習筆記(5) : 鎖與管道與多cpu

xv6學習筆記(5) : 鎖與管道與多cpu

1. xv6鎖結構

1. xv6操作系統要求在內核臨界區操作時中斷必須關閉。

如果此時中斷開啟,那麼可能會出現以下死鎖情況:

  1. 進程A在內核態運行並拿下了p鎖時,觸發中斷進入中斷處理程序。
  2. 中斷處理程序也在內核態中請求p鎖,由於鎖在A進程手裡,且只有A進程執行時才能釋放p鎖,因此中斷處理程序必須返回,p鎖才能被釋放。
  3. 那麼此時中斷處理程序會永遠拿不到鎖,陷入無限循環,進入死鎖。

2. xv6中的自旋鎖

Xv6中實現了自旋鎖(Spinlock)用於內核臨界區訪問的同步和互斥。自旋鎖最大的特徵是當進程拿不到鎖時會進入無限循環,直到拿到鎖退出循環。Xv6使用100ms一次的時鐘中斷和Round-Robin調度算法來避免陷入自旋鎖的進程一直無限循環下去。Xv6允許同時運行多個CPU核,多核CPU上的等待隊列實現相當複雜,因此使用自旋鎖是相對比較簡單且能正確執行的實現方案。

v6中鎖的定義如下

// Mutual exclusion lock.
struct spinlock {
  uint locked;       // Is the lock held?

  // For debugging:
  char *name;        // Name of lock.
  struct cpu *cpu;   // The cpu holding the lock.
  uint pcs[10];      // The call stack (an array of program counters)
                     // that locked the lock.
};

核心的變量只有一個locked,當locked為1時代表鎖已被佔用,反之未被佔用,初始值為0。

在調用鎖之前,必須對鎖進行初始化。

void initlock(struct spinlock *lk, char *name) {
  lk->name = name;
  lk->locked = 0;
  lk->cpu = 0;
}

這裡的需要注意的就是如何對locked變量進行原子操作佔用鎖和釋放鎖。

這兩步具體被實現為acquire()release()函數。

1. acquire函數

  1. 首先禁止中斷防止死鎖(如果這裡不禁止中斷就可能會出現上述的死鎖問題
  2. 然後利用xchg來上鎖
void
acquire(struct spinlock *lk)
{
  pushcli(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // The xchg is atomic.
  while(xchg(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen after the lock is acquired.
  __sync_synchronize();

  // Record info about lock acquisition for debugging.
  lk->cpu = mycpu();
  getcallerpcs(&lk, lk->pcs);
}

xchg這裡採用內聯彙編實現

最前面的lock表示這是一條指令前綴,它保證了這條指令對總線和緩存的獨佔權,也就是這條指令的執行過程中不會有其他CPU或同CPU內的指令訪問緩存和內存。

static inline uint xchg(volatile uint *addr, uint newval) {
  uint result;
  // The + in "+m" denotes a read-modify-write operand.
  asm volatile("lock; xchgl %0, %1" :
               "+m" (*addr), "=a" (result) :
               "1" (newval) :
               "cc");
  return result;
}

最後,acquire()函數使用__sync_synchronize為了避免編譯器對這段代碼進行指令順序調整的話和避免CPU在這塊代碼採用亂序執行的優化。

這樣就保證了原子操作的加鎖。也就是把lk->locked設置為1

2. release函數

// Release the lock.
void release(struct spinlock *lk) {
  if(!holding(lk))
    panic("release");

  lk->pcs[0] = 0;
  lk->cpu = 0;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other cores before the lock is released.
  // Both the C compiler and the hardware may re-order loads and
  // stores; __sync_synchronize() tells them both not to.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code can't use a C assignment, since it might
  // not be atomic. A real OS would use C atomics here.
  asm volatile("movl $0, %0" : "+m" (lk->locked) : );

  popcli();
}

release函數為了保證設置locked為0的操作的原子性,同樣使用了內聯彙編。

2. 信號量

關於信號量的討論

在xv6中沒有信號量,但是我看博客有大佬利用xv6中提供的接口實現了信號量

3. 喚醒與睡眠

1. 先看有問題的版本

struct q {
    struct spinlock lock;
    void *ptr;
};

void *
send(struct q *q, void *p)
{
    acquire(&q->lock);
    while(q->ptr != 0)
        ;
    q->ptr = p;
    wakeup(q);
    release(&q->lock);
}

void*
recv(struct q *q)
{
    void *p;
    acquire(&q->lock);
    while((p = q->ptr) == 0)
        sleep(q);
    q->ptr = 0;
    release(&q->lock;
    return p;
}

這個代碼乍一看是沒什麼問題,發出消息之後,調用wakeup喚醒進程來接受。而且在發送的過程中保持鎖。反正出現問題。在發送完成後釋放鎖。同時wakeup會讓進程從seleep狀態返回,這裡的返回就會到recv函數來進行接受。

但是這裡的問題在於噹噹 recv 帶着鎖 q->lock 進入睡眠後,發送者就會在希望獲得鎖時一直阻塞。

所以想要解決問題,我們必須要改變 sleep 的接口。sleep 必須將鎖作為一個參數,然後在進入睡眠狀態後釋放之;這樣就能避免上面提到的「遺失的喚醒」問題。一旦進程被喚醒了,sleep 在返回之前還需要重新獲得鎖。於是我們應該使用下面的代碼:

2. 上面版本的改進

struct q {
    struct spinlock lock;
    void *ptr;
};

void *
send(struct q *q, void *p)
{
    acquire(&q->lock);
    while(q->ptr != 0)
        ;
    q->ptr = p;
    wakeup(q);
    release(&q->lock);
}

void*
recv(struct q *q)
{
    void *p;
    acquire(&q->lock);
    while((p = q->ptr) == 0)
        sleep(q, &q->lock);
    q->ptr = 0;
    release(&q->lock;
    return p;
}

recv 持有 q->lock 就能防止 sendrecv 檢查 q->ptr 與調用 sleep 之間調用 wakeup 了。當然,為了避免死鎖,接收進程最好別在睡眠時仍持有鎖。所以我們希望 sleep 能用原子操作釋放 q->lock 並讓接收進程進入休眠狀態。

完整的發送者/接收者的實現還應該讓發送者在等待接收者拿出前一個 send 放入的值時處於休眠狀態。

3. xv6的wakeup和sleep

sleep(chan) 讓進程在任意的 chan 上休眠,稱之為等待隊列(wait channel)sleep 讓調用進程休眠,釋放所佔 CPU。wakeup(chan) 則喚醒在 chan 上休眠的所有進程,讓他們的 sleep 調用返回。如果沒有進程在 chan 上等待喚醒,wakeup 就什麼也不做。

總體思路是希望 sleep 將當前進程轉化為 SLEEPING 狀態並調用 sched 以釋放 CPU,而 wakeup 則尋找一個睡眠狀態的進程並把它標記為 RUNNABLE

1. sleep函數

  1. 首先sleep會做幾個檢查:必須存在當前進程、並且 sleep 必須持有鎖(2558-2559)。接着 sleep 要求持有 ptable.lock
  2. 於是該進程就會同時持有鎖 ptable.locklk 了。調用者(例如 recv)是必須持有 lk 的,這樣可以保證其他進程(例如一個正在運行的 send)無法調用 wakeup(chan)。而如今 sleep 已經持有了 ptable.lock,那麼它現在就能安全地釋放 lk 了:這樣即使別的進程調用了 wakeup(chan)wakeup 也不可能在沒有持有 ptable.lock 的情況下運行,所以 wakeup 必須等待 sleep 讓進程睡眠後才能運行。這樣一來,wakeup 就不會錯過 sleep 了。
  3. 這裡要注意如果lk == ptable.lock的話則就會跳過對於if語句的判斷
  4. 後面做的就是把當前進程放入chan中(等待隊列)
  5. 隨後進行調度交出cpu
void
sleep(void *chan, struct spinlock *lk)
{
  struct proc *p = myproc();
  
  if(p == 0)
    panic("sleep");

  if(lk == 0)
    panic("sleep without lk");

  // Must acquire ptable.lock in order to
  // change p->state and then call sched.
  // Once we hold ptable.lock, we can be
  // guaranteed that we won't miss any wakeup
  // (wakeup runs with ptable.lock locked),
  // so it's okay to release lk.
  if(lk != &ptable.lock){  //DOC: sleeplock0
    acquire(&ptable.lock);  //DOC: sleeplock1
    release(lk);
  }
  // Go to sleep.
  p->chan = chan;
  p->state = SLEEPING;

  sched();

  // Tidy up.
  p->chan = 0;

  // Reacquire original lock.
  if(lk != &ptable.lock){  //DOC: sleeplock2
    release(&ptable.lock);
    acquire(lk);
  }
}

2. wakeup函數

  1. 遍歷所有的進程表
  2. 判斷p->chan == chan 設置為runable
// Wake up all processes sleeping on chan.
void
wakeup(void *chan)
{
  acquire(&ptable.lock);
  wakeup1(chan);
  release(&ptable.lock);
}
// wakeup --> wakeup1
// Wake up all processes sleeping on chan.
// The ptable lock must be held.
//
static void
wakeup1(void *chan)
{
  struct proc *p;

  for(p = ptable.proc; p < &ptable.proc[NPROC]; p++)
    if(p->state == SLEEPING && p->chan == chan)
      p->state = RUNNABLE;
}


4.管道

在xv6中管道是比較複雜的使用 sleep/wakeup 來同步讀者寫者隊列的例子。

1. 管道結構體

struct pipe {
  struct spinlock lock;
  char data[PIPESIZE];
  uint nread;     // number of bytes read
  uint nwrite;    // number of bytes written
  int readopen;   // read fd is still open
  int writeopen;  // write fd is still open
};

我們從管道的一端寫入數據位元組,然後數據被拷貝到內核緩衝區中,接着就能從管道的另一端讀取數據了。

中有一個鎖 lock和內存緩衝區。其中的域 nreadnwrite 表示從緩衝區讀出和寫入的位元組數。pipe 對緩衝區做了包裝,使得雖然計數器繼續增長,但實際上在 data[PIPESIZE - 1] 之後寫入的位元組存放在 data[0]。這樣就讓我們可以區分一個滿的緩衝區(nwrite == nread + PIPESIZE)和一個空的緩衝區(nwrite == nread),但這也意味着我們必須使用 buf[nread % PIPESIZE] 而不是 buf[nread] 來讀出/寫入數據。

2. Piperead()函數

  1. 當緩衝數據區為空的時候(並且writeopen被打開了)這個時候就要等。因為寫操作還沒完成所以主動sleep
  2. 否則就可以讀了
  3. 讀完了就喚醒寫
int
piperead(struct pipe *p, char *addr, int n)
{
  int i;

  acquire(&p->lock);
  while(p->nread == p->nwrite && p->writeopen){  //DOC: pipe-empty
    if(myproc()->killed){
      release(&p->lock);
      return -1;
    }
    sleep(&p->nread, &p->lock); //DOC: piperead-sleep
  }
  for(i = 0; i < n; i++){  //DOC: piperead-copy
    if(p->nread == p->nwrite)
      break;
    addr[i] = p->data[p->nread++ % PIPESIZE];
  }
  wakeup(&p->nwrite);  //DOC: piperead-wakeup
  release(&p->lock);
  return i;
}

3. pipewrite()函數

  1. 當緩衝區已經滿了,但是readopen被打開了,這個時候就要喚醒讀緩衝區的進程去讀操作。
  2. 否則就是簡單的寫操作了
  3. 寫完了就喚醒讀
int
pipewrite(struct pipe *p, char *addr, int n)
{
  int i;

  acquire(&p->lock);
  for(i = 0; i < n; i++){
    while(p->nwrite == p->nread + PIPESIZE){  //DOC: pipewrite-full
      if(p->readopen == 0 || myproc()->killed){
        release(&p->lock);
        return -1;
      }
      wakeup(&p->nread);
      sleep(&p->nwrite, &p->lock);  //DOC: pipewrite-sleep
    }
    p->data[p->nwrite++ % PIPESIZE] = addr[i];
  }
  wakeup(&p->nread);  //DOC: pipewrite-wakeup1
  release(&p->lock);
  return n;
}

5. xv6下的多cpu啟動

這裡多cpu主要結合828的lab來一起分析。要是有不同的也會做一下對比

在xv6中和828中對於多cpu的支持都是SMP體系架構下的。

在SMP下所有的cpu的地位是一模一樣的,他們具有相同的權限,相同的資源。所有CPU在SMP中在功能上相同,但在引導過程中,它們可以分為兩種類型:

  • 引導處理器BSP(bootstrap processor)負責初始化系統並且引導操作系統。
  • 應用處理器AP(application processor)在操作系統啟動之後被BSP激活。

由哪個(些)處理器來擔任BSP的功能是由BIOS和硬件決定的,之前的所有代碼都是在BSP上實現的。

總的步驟如下參考

  1. BIOS 啟動 BSP,完成讀取內核balabala的操作一直到main.c中

  2. BSP 從 MP Configuration Table 中獲取多處理器的的配置信息

  3. BSP 啟動 APs,通過發送INIT-SIPI-SIPI消息給 APs

  4. APs 啟動,各個 APs 處理器要像 BSP 一樣建立自己的一些機制,比如保護模式,分頁,中斷等等

關於多核啟動在網上看見了一個很好的博客我這裡也是大多參考於它

1. mpinit函數

mpinit函數負責是檢測CPU個數並將檢測到的CPU存入一個全局的數組中。

這裡我們需要解析MP Configuration Table。而在xv6中這個信息使用mp結構體來存儲的。下面是要注意的點!!

這個結構只可能出現在三個位置,尋找 floating pointer 的時候就按下面的循序查找:

  1. EBDA(Extended BIOS Data Area)最開始的 1KB
  2. 系統基本內存的最後 1KB (對於 640 KB 的基本內存來說就是 639KB-640KB,對於 512KB 的基本內存來說就是 511KB-512KB)
  3. BIOS 的 ROM 區域,在 [公式][公式] 之間
  1. 第一步要調用mpconfig函數來解析出mp

    void
    mpinit(void)
    {
      uchar *p, *e;
      int ismp;
      struct mp *mp;
      struct mpconf *conf;
      struct mpproc *proc;
      struct mpioapic *ioapic;
    
      if((conf = mpconfig(&mp)) == 0)
        panic("Expect to run on an SMP");
    

    mpconfig函數最重要的就是利用mpserach函數

    關於對照關係上面給了一個表格

    這裡就是完全按照上面說的三個順序進行測試的,只不過對應的地址變化(我也不會,也懶得看了。)就是知道他是依此去這三個地方find mp就行了

    static struct mp* mpsearch(void)     //尋找mp floating pointer 結構
    {
      uchar *bda;
      uint p;
      struct mp *mp;
    
      bda = (uchar *) P2V(0x400);     //BIOS Data Area地址
        
      if((p = ((bda[0x0F]<<8)| bda[0x0E]) << 4)){  //在EBDA中最開始1K中尋找
        if((mp = mpsearch1(p, 1024)))
          return mp;
      } else {                                 //在基本內存的最後1K中查找
        p = ((bda[0x14]<<8)|bda[0x13])*1024;    
        if((mp = mpsearch1(p-1024, 1024)))
          return mp;
      }
      return mpsearch1(0xF0000, 0x10000);   //在0xf0000~0xfffff中查找
    }
    
  2. 下面就是根據讀到的conf來解析cpu信息

  ismp = 1;
  lapic = (uint*)conf->lapicaddr;
  for(p=(uchar*)(conf+1), e=(uchar*)conf+conf->length; p<e; ){
    switch(*p){
    case MPPROC: // 如果是處理器
      proc = (struct mpproc*)p;
      if(ncpu < NCPU) {
        cpus[ncpu].apicid = proc->apicid;  // 為cpu分配apicid 也就是說apicid標識了唯一的cpu
        ncpu++;
      }
      p += sizeof(struct mpproc);
      continue;
    case MPIOAPIC:
      ioapic = (struct mpioapic*)p;
      ioapicid = ioapic->apicno;
      p += sizeof(struct mpioapic);
      continue;
    case MPBUS:
    case MPIOINTR:
    case MPLINTR:
      p += 8;
      continue;
    default:
      ismp = 0;
      break;
    }
  }
  if(!ismp)
    panic("Didn't find a suitable machine");

  if(mp->imcrp){
    // Bochs doesn't support IMCR, so this doesn't run on Bochs.
    // But it would on real hardware.
    outb(0x22, 0x70);   // Select IMCR
    outb(0x23, inb(0x23) | 1);  // Mask external interrupts.
  }
}

2. lapicinit(void)

這是在啟動多個aps之間要做的一些初始化操作。

下面用得最多的lapicw操作其實非常簡單

static void
lapicw(int index, int value)
{
	lapic[index] = value;
	lapic[ID];  // wait for write to finish, by reading
}

中斷流程:

  • 一個 CPU 給其他 CPU 發送中斷的時候, 就在自己的 ICR 中, 放中斷向量和目標LAPIC ID, 然後通過總線發送到對應 LAPIC,
  • 目標 LAPIC 根據自己的 LVT(Local Vector Table) 來對不同的中斷進行處理.
  • 處理完了寫EOI 表示處理完了.

所以下面的操作其實就是在初始化LVT表

void
lapicinit(void)
{
  if(!lapic)
    return;

  // Enable local APIC; set spurious interrupt vector.
  lapicw(SVR, ENABLE | (T_IRQ0 + IRQ_SPURIOUS));

  // The timer repeatedly counts down at bus frequency
  // from lapic[TICR] and then issues an interrupt.
  // If xv6 cared more about precise timekeeping,
  // TICR would be calibrated using an external time source.
  lapicw(TDCR, X1);
  lapicw(TIMER, PERIODIC | (T_IRQ0 + IRQ_TIMER));
  lapicw(TICR, 10000000);

  // Disable logical interrupt lines.
  lapicw(LINT0, MASKED);
  lapicw(LINT1, MASKED);

  // Disable performance counter overflow interrupts
  // on machines that provide that interrupt entry.
  if(((lapic[VER]>>16) & 0xFF) >= 4)
    lapicw(PCINT, MASKED);

  // Map error interrupt to IRQ_ERROR.
  lapicw(ERROR, T_IRQ0 + IRQ_ERROR);

  // Clear error status register (requires back-to-back writes).
  lapicw(ESR, 0);
  lapicw(ESR, 0);

  // Ack any outstanding interrupts.
  lapicw(EOI, 0);

  // Send an Init Level De-Assert to synchronise arbitration ID's.
  lapicw(ICRHI, 0);
  lapicw(ICRLO, BCAST | INIT | LEVEL);
  while(lapic[ICRLO] & DELIVS)
    ;

  // Enable interrupts on the APIC (but not on the processor).
  lapicw(TPR, 0);
}

3. startothers函數

尋到了有多少個 CPU,而且也有了每個 CPU 的標識信息,就可以去啟動它們了,直接來看 startothers 的代碼:

  1. entryother.S 是APs啟動時要運行的代碼,鏈接器將映像放在_binary_entryother_start然後將其移動到0x7000

  2. 然後利用for循環,循環啟動APs

  3. 最後調用lapicstartap() 函數來啟動 APs

static void
startothers(void)
{
  extern uchar _binary_entryother_start[], _binary_entryother_size[];
  uchar *code;
  struct cpu *c;
  char *stack;

  // Write entry code to unused memory at 0x7000.
  // The linker has placed the image of entryother.S in
  // _binary_entryother_start.
  code = P2V(0x7000);
  memmove(code, _binary_entryother_start, (uint)_binary_entryother_size);

  for(c = cpus; c < cpus+ncpu; c++){
    if(c == mycpu())  // We've started already.
      continue;

    // Tell entryother.S what stack to use, where to enter, and what
    // pgdir to use. We cannot use kpgdir yet, because the AP processor
    // is running in low  memory, so we use entrypgdir for the APs too.
    stack = kalloc();
    *(void**)(code-4) = stack + KSTACKSIZE;
    *(void(**)(void))(code-8) = mpenter;
    *(int**)(code-12) = (void *) V2P(entrypgdir);

    lapicstartap(c->apicid, V2P(code));

    // wait for cpu to finish mpmain()
    while(c->started == 0)
      ;
  }
}

4. lapicstartap函數

前面提到過BSP是通過發送信號給APs來啟動其他cpu的,簡單來說就是一個 CPU 通過寫 LAPIC 的 ICR 寄存器來與其他 CPU 進行通信

其實下面一大堆我覺得看不懂也沒差。。太低層了。只要知道下面這一點就行了

BSP通過向AP逐個發送中斷來啟動AP,首先發送INIT中斷來初始化AP,然後發送SIPI中斷來啟動AP,發送中斷使用的是寫ICR寄存器的方式

void
lapicstartap(uchar apicid, uint addr)
{
  int i;
  ushort *wrv;

  // "The BSP must initialize CMOS shutdown code to 0AH
  // and the warm reset vector (DWORD based at 40:67) to point at
  // the AP startup code prior to the [universal startup algorithm]."
  outb(CMOS_PORT, 0xF);  // offset 0xF is shutdown code
  outb(CMOS_PORT+1, 0x0A);
  wrv = (ushort*)P2V((0x40<<4 | 0x67));  // Warm reset vector
  wrv[0] = 0;
  wrv[1] = addr >> 4;

  // "Universal startup algorithm."
  // Send INIT (level-triggered) interrupt to reset other CPU.
  lapicw(ICRHI, apicid<<24);
  lapicw(ICRLO, INIT | LEVEL | ASSERT);
  microdelay(200);
  lapicw(ICRLO, INIT | LEVEL);
  microdelay(100);    // should be 10ms, but too slow in Bochs!

  // Send startup IPI (twice!) to enter code.
  // Regular hardware is supposed to only accept a STARTUP
  // when it is in the halted state due to an INIT.  So the second
  // should be ignored, but it is part of the official Intel algorithm.
  // Bochs complains about the second one.  Too bad for Bochs.
  for(i = 0; i < 2; i++){
    lapicw(ICRHI, apicid<<24);
    lapicw(ICRLO, STARTUP | (addr>>12));
    microdelay(200);
  }
}

5. 看一下entryother.S

上面的代碼總共就幹了兩件事,1.找到所有的cpu信息、2. 利用IPI中斷,也就是寫ICR寄存器來啟動其他的APS。

而啟動APs是要執行boot代碼的。對應的地址是0x7000。也就是entryother.S的代碼

這個代碼前面乾的事情和entry.S基本一樣的。需要注意的就是下面這兩行

# Switch to the stack allocated by startothers()
movl    (start-4), %esp
# Call mpenter()
call    *(start-8)
// Other CPUs jump here from entryother.S.
static void
mpenter(void)
{
  switchkvm(); //切換到內核頁表
  seginit(); // 初始化gdt
  lapicinit(); // 初始化apic
  mpmain(); 
}
static void mpmain(void)
{
  cprintf("cpu%d: starting %d\n", cpuid(), cpuid());
  idtinit();       // 加載GDT
  xchg(&(mycpu()->started), 1); // 將started置1表啟動完成了
  scheduler();     // 開始調度進程執行程序了
}

可以看到,這裏面所做的工作主要還是初始化建立環境,最後 CPU 這個結構體中的元素 started 置 1 表示這個 CPU 已經啟動好了,這裡就會通知 startothers 函數,可以啟動下一個 AP 了。最後就是調用 scheduler() 可以開始調度執行程序了。

下面是startothers中的一段while循環。

// startothers
  // wait for cpu to finish mpmain()
    while(c->started == 0)
      ;