深入剖析WebRTC事件機制之Sigslot

前言

我最早了解到 sigslot 大概是在 2007年 左右,當時在QT中大量使用了 sigslot 的概念。 現在 WebRTC 中也大量使用了 sigslot 這種機制來處理底層的事件。它對我們閱讀WebRTC程式碼至關重要。本篇文章就詳細介紹一下 sigslot。

Sigslot作用

Sigslot 的作用一句話表式就是為了解耦。例如,有兩個類 A 和 B,如果 B 使用 A, 就必須在 B 類中寫入與 A 類有關的程式碼。看下程式碼:

class A {  public:      void funcA();  }    class B {  public:      B(A& a){          m_a = a;      }        void funcB(){          m_a.funcA(); //這裡調用了A類的方法      }    private:      A m_a; //引用 A 類型成員變數。  }    void main(int argc, char *argv[]){      A a;      B b(a);      b.funcB();  }

這裡的弊端是 B 中必須要聲名使用 A。如果我們的項目特別複雜,這樣的使用方式在後期維護時很容易讓我們掉入「陷阱」。有沒有一種通用的辦法可以做到在 B 中不用使用 A 也可以調用 A 中的方法呢?答案就是使用 sigslot。我們看下面的程式碼:

class A : public sigslot::has_slot<>  {  public:      void  funcA();  };    class B  {  public:      sigslot::signal0<> sender;  };    void main(int argc, char *argv[]){        A a;      B b;        //在運行時才將 a 和 b 綁定到一起      b.sender.connect(&a, &A::funcA);      b.sender();    } 

通過上面的程式碼我們可以看到 B 中沒有一行與 A 相關的程式碼。只在 main 函數中(也就是在運行時)才知道 A 與 B 有關聯關係。是不是覺得很神奇呢?下面我們就看一下它的實現原理。

實現原理

sigslot的原理其實非常簡單,它就是一個變化的觀察者模式。觀察者模式如下所示:

觀察者模式,首先讓 Observer(「觀察者」)對象 註冊到 Subject(「被觀察者」) 對象中。當 Subject 狀態發生變化時,遍歷所有註冊到自己的 Observer 對象,並調用它們的 notify方法。

sigslot與觀察者模式類似,它使用signal(「訊號」)和slot("槽"),區別在於 signal 主動連接自己感興趣的類及其方法,將它們保存到自己的列表中。當發射訊號時,它遍歷所有的連接,調用 slot(「槽」) 方法。

如何使用

下面我們看一下 WebRTC 中是如何使用 sigslot 的。

  • 首先,定義 slot("槽"),也就是事件處理函數。在WebRTC中定義槽必須繼承 has_slots<>。如下圖所示:
  • 其次,定義 signal (「訊號」) ,也就是發送的訊號。 sigslot::signal1<AsyncSocket*, sigslot::multi_threaded_local> SignalWriteEvent;
  • 然後,將 signal 與 slot 連接到一起。在這裡就是將 AsyncUDPSocket和 OnWriteEvent方法與signal綁定到一起。 socket_->SignalWriteEvent.connect(this, &AsyncUDPSocket::OnWriteEvent);
  • 最後,發送訊號。在 WebRTC中根據參數的不同定義了許多 signal,如 signal1 說明帶一個參數,signal2說明帶兩個參數。 SignalWriteEvent(this);

關鍵程式碼

下面是對 sigslog 的類關係圖及關鍵程式碼與其詳細注釋。

...    // On our copy of sigslot.h, we set single threading as default.  #define SIGSLOT_DEFAULT_MT_POLICY single_threaded    #if defined(SIGSLOT_PURE_ISO) ||                         (!defined(WEBRTC_WIN) && !defined(__GNUG__) &&        !defined(SIGSLOT_USE_POSIX_THREADS))  #define _SIGSLOT_SINGLE_THREADED  #elif defined(WEBRTC_WIN)  #define _SIGSLOT_HAS_WIN32_THREADS  #if !defined(WIN32_LEAN_AND_MEAN)  #define WIN32_LEAN_AND_MEAN  #endif  #include "webrtc/rtc_base/win32.h"  #elif defined(__GNUG__) || defined(SIGSLOT_USE_POSIX_THREADS)  #define _SIGSLOT_HAS_POSIX_THREADS  #include <pthread.h>  #else  #define _SIGSLOT_SINGLE_THREADED  #endif    #ifndef SIGSLOT_DEFAULT_MT_POLICY  #ifdef _SIGSLOT_SINGLE_THREADED  #define SIGSLOT_DEFAULT_MT_POLICY single_threaded  #else  #define SIGSLOT_DEFAULT_MT_POLICY multi_threaded_local  #endif  #endif    // TODO: change this namespace to rtc?  namespace sigslot {    ...    //這面這大段程式碼是為了實現智慧鎖使用的。  //它會根據不同的平台初始化不同的互斥量,並調用不同的鎖函數。    //如果是 Window 平台  #ifdef _SIGSLOT_HAS_WIN32_THREADS  // The multi threading policies only get compiled in if they are enabled.    //如果是全局執行緒  class multi_threaded_global {   public:    multi_threaded_global() {      static bool isinitialised = false;        if (!isinitialised) {        InitializeCriticalSection(get_critsec());        isinitialised = true;      }    }      void lock() { EnterCriticalSection(get_critsec()); }      void unlock() { LeaveCriticalSection(get_critsec()); }     private:    CRITICAL_SECTION* get_critsec() {      static CRITICAL_SECTION g_critsec;      return &g_critsec;    }  };    //如果是本地執行緒  class multi_threaded_local {   public:    multi_threaded_local() { InitializeCriticalSection(&m_critsec); }      multi_threaded_local(const multi_threaded_local&) {      InitializeCriticalSection(&m_critsec);    }      ~multi_threaded_local() { DeleteCriticalSection(&m_critsec); }      void lock() { EnterCriticalSection(&m_critsec); }      void unlock() { LeaveCriticalSection(&m_critsec); }     private:    CRITICAL_SECTION m_critsec;  };  #endif  // _SIGSLOT_HAS_WIN32_THREADS    //非window平台  #ifdef _SIGSLOT_HAS_POSIX_THREADS  // The multi threading policies only get compiled in if they are enabled.    //如果是全局執行緒  class multi_threaded_global {   public:    void lock() { pthread_mutex_lock(get_mutex()); }    void unlock() { pthread_mutex_unlock(get_mutex()); }     private:    static pthread_mutex_t* get_mutex();  };    //如果是本地執行緒  class multi_threaded_local {   public:    multi_threaded_local() { pthread_mutex_init(&m_mutex, nullptr); }    multi_threaded_local(const multi_threaded_local&) {      pthread_mutex_init(&m_mutex, nullptr);    }    ~multi_threaded_local() { pthread_mutex_destroy(&m_mutex); }    void lock() { pthread_mutex_lock(&m_mutex); }    void unlock() { pthread_mutex_unlock(&m_mutex); }     private:    pthread_mutex_t m_mutex;  };  #endif  // _SIGSLOT_HAS_POSIX_THREADS    //根據不同的策略調用不同的鎖。  //這裡的策略就是不同的平台  template <class mt_policy>  class lock_block {   public:    mt_policy* m_mutex;      lock_block(mt_policy* mtx) : m_mutex(mtx) { m_mutex->lock(); }      ~lock_block() { m_mutex->unlock(); }  };    class _signal_base_interface;    class has_slots_interface {     ...     public:    void signal_connect(_signal_base_interface* sender) {      ...    }      void signal_disconnect(_signal_base_interface* sender) {      ...    }      void disconnect_all() { ... }  };    class _signal_base_interface {   ...     public:    void slot_disconnect(has_slots_interface* pslot) {      ...    }      void slot_duplicate(const has_slots_interface* poldslot,                        has_slots_interface* pnewslot) {      ...    }  };    // 該類是一個特別重要的類  // signal與slot綁定之前,必須先將槽對象與槽方法組成 connection  //  class _opaque_connection {   private:    typedef void (*emit_t)(const _opaque_connection*);      //聯合結構體,用於函數轉換    template <typename FromT, typename ToT>    union union_caster {      FromT from;      ToT to;    };      //訊號發射函數指針    emit_t pemit;    //存放「槽」對象    has_slots_interface* pdest;    // Pointers to member functions may be up to 16 bytes for virtual classes,    // so make sure we have enough space to store it.    unsigned char pmethod[16];     public:    //構造函數    //在構造connect時,要傳入槽對象和槽類方法指針    template <typename DestT, typename... Args>    _opaque_connection(DestT* pd, void (DestT::*pm)(Args...)) : pdest(pd) {      //定義成員函數指針,與C語言中的函數指針是類似的      typedef void (DestT::*pm_t)(Args...);      static_assert(sizeof(pm_t) <= sizeof(pmethod),                    "Size of slot function pointer too large.");        std::memcpy(pmethod, &pm, sizeof(pm_t));         //定義了一個函數指針      typedef void (*em_t)(const _opaque_connection* self, Args...);        //通過下面的方法,將 pemit 函數變理指向了 emitter 函數。      union_caster<em_t, emit_t> caster2;      //注意 emitter後面的是模版參數,不是函數參數,這裡不要弄混了。      caster2.from = &_opaque_connection::emitter<DestT, Args...>;      pemit = caster2.to;    }      //返回"槽"對象    has_slots_interface* getdest() const { return pdest; }      ...      //因為在構造函數里已經將 pemit 設置為 emitter 了,    //所以下面的程式碼就是調用 emitter 函數。為里只不過做了一次函數指針類型轉換。    //也就是說調用 connect 的 emit 方法,實際調的是 emitter。    template <typename... Args>    void emit(Args... args) const {      typedef void (*em_t)(const _opaque_connection*, Args...);      union_caster<emit_t, em_t> caster;      caster.from = pemit;      (caster.to)(this, args...);    }     private:    template <typename DestT, typename... Args>    static void emitter(const _opaque_connection* self, Args... args) {      //pm_t是一個成員函數指針,它指向的是傳進來的成員方法      typedef void (DestT::*pm_t)(Args...);      pm_t pm;      std::memcpy(&pm, self->pmethod, sizeof(pm_t));      //調用成員方法      (static_cast<DestT*>(self->pdest)->*(pm))(args...);    }  };    //signal_with_thread_policy類的父類。  //該類最主要的作用是存有一個conn list。  //在 signal_with_thread_policy中的connect方法就是對該成員變數的操作。    template <class mt_policy>  class _signal_base : public _signal_base_interface, public mt_policy {   protected:    typedef std::list<_opaque_connection> connections_list;     public:    ...     protected:    //在 _signal_base 中定義了一個connection list,用於綁定的 slots.    connections_list m_connected_slots;    ...    };    //該類是"槽"的實現  template <class mt_policy = SIGSLOT_DEFAULT_MT_POLICY>  class has_slots : public has_slots_interface, public mt_policy {   private:    typedef std::set<_signal_base_interface*> sender_set;    typedef sender_set::const_iterator const_iterator;     public:    has_slots()        : has_slots_interface(&has_slots::do_signal_connect,                              &has_slots::do_signal_disconnect,                              &has_slots::do_disconnect_all) {}     ...     private:    has_slots& operator=(has_slots const&);      //靜態函數,用於與signal綁定,由父類調用    //它是在構造函數時傳給父類的    static void do_signal_connect(has_slots_interface* p,                                  _signal_base_interface* sender) {      has_slots* const self = static_cast<has_slots*>(p);      lock_block<mt_policy> lock(self);      self->m_senders.insert(sender);    }      //靜態函數,用於解綁signal,由父類調用    //它是在構造函數時傳給父類的    static void do_signal_disconnect(has_slots_interface* p,                                     _signal_base_interface* sender) {      has_slots* const self = static_cast<has_slots*>(p);      lock_block<mt_policy> lock(self);      self->m_senders.erase(sender);    }      ...      private:    //該集合中存放的是與slog綁定的 signal    sender_set m_senders;  };    //該類是訊號的具體實現  //為了保證訊號可以在不同的平台是執行緒安全的,所以這裡使用了策略模式  //mt_policy參數表式的是,不同的平台使用不同的策略  //該類中有兩個重要的函數,一個是connect用於與槽進行綁定;另一個是 emit用於發射訊號    template <class mt_policy, typename... Args>  class signal_with_thread_policy : public _signal_base<mt_policy> {  public:      ...      template <class desttype>    void connect(desttype* pclass, void (desttype::*pmemfun)(Args...)) {      //這是一個智慧鎖,當函數結束時,自動釋放鎖。      lock_block<mt_policy> lock(this);      //先將對象與"槽"組成一個conn,然後存放到 signal的 conn list里      //當發射訊號時,調用 conn list中的每個conn的 emit方法。      this->m_connected_slots.push_back(_opaque_connection(pclass, pmemfun));        //在槽對象中也要保存 signal 對象。      pclass->signal_connect(static_cast<_signal_base_interface*>(this));    }      //遍歷所有的連接,並調用 conn 的emit方法。最終調用的是綁定"槽"的方法    void emit(Args... args) {      lock_block<mt_policy> lock(this);      this->m_current_iterator = this->m_connected_slots.begin();      while (this->m_current_iterator != this->m_connected_slots.end()) {        _opaque_connection const& conn = *this->m_current_iterator;        ++(this->m_current_iterator);          //調 conn 的 emit 方法,最終會調用綁定的 "槽" 方法。        conn.emit<Args...>(args...);      }    }      //重載()操作符,這樣就從直接調用emit方法變成隱含調用emit方法。    void operator()(Args... args) { emit(args...); }  };      //下面的對不同參數訊號的定義  template <typename... Args>  using signal = signal_with_thread_policy<SIGSLOT_DEFAULT_MT_POLICY, Args...>;      template <typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>  using signal0 = signal_with_thread_policy<mt_policy>;    template <typename A1, typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>  using signal1 = signal_with_thread_policy<mt_policy, A1>;    template <typename A1,            typename A2,            typename mt_policy = SIGSLOT_DEFAULT_MT_POLICY>  using signal2 = signal_with_thread_policy<mt_policy, A1, A2>;    ...    }  // namespace sigslot

小結

本文通過 sigslot作用、實現原理、如何使用以及詳細的程式碼注釋四個部分剖析了 WebRTC 中的 sigslot。sigslot是 WebRTC中非常底性的基礎程式碼,它對 WebRTC 事件機制起著關鍵性的作用。熟悉sigslot,對我們閱讀 WebRTC 程式碼會有非常大的幫助。