Dempster–Shafer theory(D-S證據理論)初探
1. 證據理論的發展歷程
Dempster在1967年的文獻《多值映射導致的上下文概率》中提出上、下概率的概念,並在一系列關於上下概率的文獻中進行了拓展和應用,其後又在文獻《貝葉斯推理的一般化》中進一步探討了不滿足可加性的概率問題以及統計推理的一般化問題。
Shafer在Dempster研究的基礎上提出了證據理論,把Dempster合成規則推廣到更為一般的情況,並與1976年出版《證據的數學理論》,這一著作的出版標誌著證據理論真正的誕生,為了紀念兩位學者對證據理論所做的貢獻,人們把證據理論稱為Dempster-Shafer證據理論(D-S證據理論)。
自從證據理論誕生以來,在將近四十年的發展中,很多學者對證據理論進行了較為廣泛的研究,也取得了豐碩的理論研究結果,D-S證據理論的應用性得到了廣泛的證實。
2. 為什麼需要D-S證據理論
證據理論自1976年美國學者G.Shafer發表了著作《證據的數學理論》以來,在理論上取得了很大的發展,在應用上也取得了豐富的成果。
證據理論在多分類器融合、不確定性推理、專家意見綜合、多準則決策、模式識別、綜合診斷等領域中都得到了較好的應用。
證據理論基於人們對客觀世界的認識,根據人們掌握的證據和知識,對不確定性事件給出不確定性度量。這樣做使得不確定性度量更切近人們的習慣,易於使用。
證據理論對論證合成給出了系統的合成公式,使多個證據合成後得到的基本可信數依然滿足證據基本可信數的性質。
Dempster合成公式具有結合律與交換律,使其有利於實現計算和選擇合成效果好的、資訊品質高的資訊源。
0x1:各個領域遇到的不確定性證據和證據衝突問題
1、自動駕駛領域中,多感測器資訊融合問題
目前感測器採集到的數據都是帶有雜訊的,比如:
- LiDAR採集的點雲的內部雜訊:
- 收集的距離誤差
- intensity校準
- 時間對齊
- 點雲外部雜訊:
- 定位誤差
- 運動過程中所帶來的點雲位置畸變
- 雲天霧天的鬼影等等
- Radar採集中的雜訊:
- 相對於Radar坐標系下橫向速度不準
- 標定不準確(稀疏點無法很好地和車體坐標系進行非常精確的標定)
- 運動補償不準確帶來的誤差
- 相機採集的圖片中的雜訊:
- occlusion
- 逆光
- 低光(HDR可以解決亮度部分問題的同時也會帶來新的誤差)
- 標定
- 同步等等
一個典型的問題是,現在自動駕駛車在行駛過程中,LiDAR檢測到了某個位置有個障礙物,Radar沒有檢測到,影像沒有檢測到,那麼在這多種感測器的數據源下,我們需要進行一個決策,該位置有沒有障礙物,最好可以給出這種決策的不確定性區間是多少。
2、網路安全入侵檢測領域中,多源日誌和多源告警的融合決策問題
3、數據分析中,不同的分類器的輸出結果的綜合決策問題
暫無
3. D-S證據理論基本概念
0x1:從概率的四種解釋說起
- 客觀解釋(頻率解釋):概率描述了一個可以重複出現的事件的客觀事實,即該事件可重複出現的頻率,如擲骰子。
- 個人主義解釋(主觀解釋、貝葉斯解釋):概率反映了個人主義的一種偏好,是個人的主觀意願作用的結果,如賭博。
- 必要性解釋(邏輯主義解釋):把概率看成命題與命題之間聯繫程度的度量。這種聯繫是純客觀的,與人的作用無關,演繹推理是概率推理的一個特例。
- 構造性(constructive)解釋:Shafer指出以上三種解釋都沒有涉及概率推斷的構造性特性,證據理論給出概率一種新的構造性解釋,認為概率是某人在證據的基礎上構造出的他對一命題為真的信任程度(degree of belief),簡稱信度。
前三種概率滿足可加性,即:
根據可加性可知,如果我們相信一個命題為真的程度為S,那麼我們必須以 1-S 的程度去相信該命題的反。但是在很多情況下,這是不合理的,例如:」地球以外存在著生命「這一命題,其反命題是:」地球以內不存在生命「。
在實現世界的大部分時候,證據與證據之間都是存在著交疊與衝突的,並不是完美的.i.i.d.關係。
Daldey的不可能性定理指出,在兩個基本假設下,不存在一種集結個體概率估計的數學公式滿足概率定理。這兩個假設如下:
- 個體的概率估計Pi(E)是獨立隨機變數
- 不論給定的事件E發生或不發生,Pi(E)是獨立隨機變數
也就是說,如果群體概率估計是個體概率估計的函數,則群體概率估計不滿足概率定律,這意味著基於概率的集結模型在Daldey不可能性定理的條件下都將失效。
0x2:證據、識別框架及信度函數
1、證據在D-S理論中的定義
在證據理論中,證據指的是人們分析命題求其基本可信數所依據的事物的屬性與客觀環境(實證據),還包括人們的經驗、知識和對該問題所做的觀察和研究。
人們通過對證據的分析得出命題的基本可信數分配函數:m(A)。
筆者注:
D-S理論中的證據概念,是從概率論中隨機變數的概念中推廣而來的。
2、識別框架
對一個判決問題,設人們所能認識到的可能結果用集合Θ表示,那麼人們所關心的任一命題都對應於Θ的一個子集。Θ被稱為識別框架。
識別框架依賴於人們的認知水平,框架中的元素由人們」已經知道「或」想知道「的知識決定。
筆者注:
識別框架是一個更泛化的理論概念,在經典概率理論中被稱為樣本空間。
3、命題與基本可信度分配(Mass)
設Θ為識別框架,A為識別框架下的一個命題,如果集函數 m:2Θ → [0,1] 滿足:
則稱 m 為框架Θ上的基本可信度分配。m(A) 被稱為 A 的基本可信數。
基本可信數反映了對 A 本身(而不去管它的任何真子集與前因後果)的信度大小。
筆者注:
命題表達了我們對待認識的目標對象(識別框架)的一種潛在推測,每一個命題都是識別框架的一個子集,對應一個對現實問題的抽象表徵。
基本可信度分配是概率論中完備性的一個泛化推廣。基本可信數累加和為1,代表著所有命題共同組合在一起,構成了完整的識別框架。
需要注意的是,和隨機事件一樣,命題本身是一個集合的概念(離散情況下),所以命題可以有子命題,對命題的可信度分配,同樣也有子集的概念。
4、信度函數(Belief)
設Θ為識別框架,集函數 m:2Θ → [0,1] 為框架Θ上的基本可信度分配,則稱由:
所定義的函數 Bel:2Θ → [0,1] 為Θ上的信度函數。信度函數表達了對每個命題的信度(考慮前因後果)。
筆者注:
可以看到,信度函數是一系列可信度分配的累計合成結果,這和概率論中隨機變數是單個離散隨機事件(離散概率)的累計的概念是一致的。
5、信度函數的半可加性
設Θ為識別框架,集函數 Bel:2Θ → [0,1] 是信度函數,當且僅當它滿足:
,
,n為任意自然數
則有下式:
上式為半可加性應滿足的條件,滿足可加性則一定滿足半可加性。
前面說過,貝葉斯信度函數是一種特殊的信度函數,它的所有焦元都是單元素的,所以貝葉斯信度函數是滿足完全可加性的。
6、信度函數的焦元
一批證據對一個命題提供支援的話,那麼它也應該對該命題的推論提供同樣的支援(互資訊原理)。所以, 對一個命題的信度應該等於證據對它的所有前提本身提供的支援度之和。
如果 m(A) > 0,則稱 A 為信度函數 Bel 的焦元,所有焦元的合併成為它的核心。
本質上說,焦元就是信度函數非零的命題。
幾種特殊的信度函數如下:
1)絕對信度函數
稱 m(A) 為絕對信度函數,其中 A* ∈ Θ。
2)空信度函數
對一個信度函數而言,如果 m(Θ) = 1,我們將其成為空信度函數。
3)Bayesian信度函數
如果一個信度函數的焦元都為單元素集,我們將其成為 Bayesian信度函數,Bayesian信度函數即為概率函數。
筆者注:
貝葉斯理論最核心的假設就是元素(證據)之間是.i.i.d.獨立同分布假設,這個假設在實際問題中很多時候是不成立的。 證據合成理論最大的理論貢獻就是在可信度分配的新理論框架下,將證據之間的衝突性問題納入了考慮和計算範圍。
7、眾信度函數(Q)
設函數 Q:2Θ → [0,1],且滿足下式:
則 Q 被稱為眾信度函數。眾信度函數 Q(A) 反映了包含A的集合的所有基本可信數之和。
筆者注:
信度函數 Bel 是從一個結論的前提這個角度來描述信度的,它計算的結論的前提(也就是證據)的累計合併結果;而相比不同的是,眾信度函數 Q 是從一個前提的結論這個角度來描述信度的。
- 信度函數 Bel:結論的前提的累計合併結果,承前。
- 眾信度函數 Q:前提的結論的累計合併結果,啟後。
8、Bel信度函數和Q眾信度函數概念的互相定義
設 Bel:2Θ → [0,1] 為Θ上的信度函數,Q 是它的眾信度函數,那麼,
該定理說明,Bel 與 Q 可以互相定義,眾信度函數從另一個側面反映了信度。
Dempster合成法則是證據理論的核心,是一個反映證據的聯合 作用的法則,該法則可對不同證據源的證據進行合成。
9、似真度函數(plausibility function)
設函數 pl:2Θ → [0,1],且滿足下式:
我們稱 pl 為似真度函數(plausibility function)。pl(A) 稱為 A 的似真度,表示我們不懷疑 A 的程度。pl(A) 包含了所有與 A 相容的那些集合(命題)的基本可信度。pl(A) 是比 Bel(A) 更寬鬆的一種估計。
10、信度函數合成的Dempster法則
1)雙信度函數合成的Dempster法則
設 Bel1 和 Bel2 是同一識別框架Θ上的兩個信度函數,m1 和 m2 分別是其對應的基本信度分配,焦元分別為 A1,A2,….,Ak 和 B1,B2,…,Bl,設:
則由下式定義的函數 m:2Θ → [0,1] 是基本信度分配:
並稱此為兩個信度函數合成的Dempster法則。
由此得到的合成後的基本信度分配m被稱為 m1 和 m2 的直和。記為。所對應的信度函數也被稱為 Bel1 和 Bel2 的直和,記為
。
(1 – K)-1 稱為歸一化因子,它的引入實際上是為了避免證據組合時將非零的信度賦給空集,把空集所丟棄的信度分配按比例地補到非空集上,K表示證據間的衝突程度,其值越大說明證據之間的衝突越大。
當 K=1 時,則 m1 和 m2 完全衝突,直和或
不存在。
2)多信度函數合成的Dempster法則
設 Bel1,Bel2,…,Beln 是同一識別框架Θ上的信度函數,m1,m2,….,mn 是對應的基本信度分配,如果存在且腳本可信度分配為m,且
,則有:
,其中
上式即多個信度函數合成的Dempster法則。
上面公式可以這麼理解,命題的合成結果A,等於所有參與合併的命題中,相交於 A 的那些命題的的mass函數值的乘積的和,再除以一個歸一化係數 1-K。歸一化係數 1-K 中的 K 的含義是證據之間的衝突(the conflict between the evidences, called conflict probability)。
對於歸一化係數K來說:
- 一個極端的情況就是,多個命題背後的支援證據彼此間完全衝突,即完全不相交,則Kn無限大,導致最終合成結果為零
- 另一個極端情況是,多個命題背後的支援證據完全重合,即完全相交,則Kn=1,則最終合成結果就等於各個子命題的累乘結果,這其實就是貝葉斯理論的假設前提
- 一般情況下,多個命題背後的支援證據存在部分重合,同時也有各自獨立的貢獻部分,換句話說,不同命題之間存在相關性
筆者注:
從信度函數合成的Dempster法則公式可以看出,Dempster合成公式是在貝葉斯概率分解公式的基礎上,增加了對證據間衝突性的考慮,對彼此存在衝突的證據合成起到歸一化的作用。
3)Dempster法則的數學性質
- 交換律:
,證據合成並不依賴於合成的順序。
-
結合律:
,證據合成次序不影響合成結果。
-
同一性:存在唯一的幺元m0,對所有的基本信度分配m,有
,其中m0是空基本信度分配,即 m0(Θ) = 1,m0(其他) = 0,該性質在現實世界中的意義是某些專家不表態(棄權)時不影響最終結果。
-
極化性:
,其中
表示一種」大於「或」放大「,表示意見相同的地位專家合成的效果是」支援的假設更支援,否定的假設更否定「,向兩級增幅地發展。
-
證據聚焦性:當兩個證據都均以較高的信度支援識別框架中的某一命題時,合成後該命題將具有更高的信度。
11、信任區間
[Bel(A),pl(A)]:表示命題A的信任區間,
- Bel(A):表示似真函數的下限
- pl(A):表示似真函數的上限
例如(0.25,0.85),表示A為真有0.25的信任度,A為假有0.15的信任度,A不確定度為0.6。
0x3:框架的轉化
一般來說,對任何一個證據處理人員來講,他所想到的各種有用的觀念、判斷,是非常多的,某一框架只能包含其中很少的一部分,所以用單個識別框架來處理問題是很不夠的。這種情況下證據處理人員為了進行某種特殊的似真推理,就有必要求助於不同的觀念,側重於不同的特性相應地轉換他所使用的識別框架。
粗化與細化就是為適應這個要求而採用的兩種轉換方法。值得說明的是,框架的轉化不是隨意的,儘管轉化前後兩者要強調不同的特性、不同的側面,而且在其關注的方向上可以具有不同程度的判決,但是兩者決不能使用相互矛盾的、互不相容的概念。
細化與粗化就是兩種相容的變換,而收縮與擴張就不是相容變換。
1、框架的劃分
設Θ是識別框架,δ是Θ的一些子集所構成的集合,滿足:
則δ被稱為Θ的一個劃分。
2、細化與粗化
δ1和δ2都是Θ的劃分,對都存在
使
,則稱δ1是δ2的加細(細分),δ2是δ1的加粗(粗化)。記為
,
意味著δ2的每一個元素都是δ1的一些元素的並。
對於Θ的任意兩個劃分δ1和δ2,可以定義另一個
仍是Θ的一個劃分,且有:
若有另一個劃分δ3,滿足:
則有:
由以上定理可知,我們所定義的劃分既是δ1的加細,又是δ2的加細,而且在既是δ1的加細,又是δ2的加細劃分中,
又是最粗的,所以稱
是δ1和δ2最粗的公共加細。讀者朋友通過畫一個簡單的韋恩圖,基於集合論的知識就很容易理解這個性質了。
在Θ的所有劃分中,最粗的劃分是{Θ},即只包含一個元素即Θ本身的一個集合;最細的劃分是{{θ} θ∈Θ},即Θ的單點子集的集合。
3、框架相容概念
對框架的細分來講,是否存在一個極細化的精細框架(ultimate refinement),Shafer理論和貝葉斯理論有著不同的看法:
- 貝葉斯理論假定存在一個不能再細分的框架
- 而Shafer認為,任何一個被細分了的框架都可以繼續進行細分
所以在Shafer理論中,有如下的兩個框架相容的概念:
定理1:
設Θ1和Θ2為兩個識別框架,若這兩框架具有一個公共的精細化框架,則稱框架Θ1和Θ2是相容的。
顯然,根據這個定理,在兩個框架中,當其中一個是另一個的精細時,這兩個框架必相容。
定理2:
如果在一個框架Θ上加上一個新假設,則Θ的元素必定要減少,設Θ1是由Θ加上一個新假設而得到的識別框架,則Θ1作為可能事件或結果的一個集合,將是可能結果Θ的一個子集。
此時,我們稱Θ1是Θ的一個收縮,Θ是Θ1的一個擴張。
筆者注:
識別框架的收縮與擴張,其本質就是概率論中假設空間放縮的一個推廣,在貝葉斯理論中,通過引入先驗知識,可以顯著降低潛在的假設搜索空間,從而讓概率評估結果更加符合人們對目標對象的事實依據。
Relevant Link:
https://www.hindawi.com/journals/js/2016/1903792/
4. D-S證據融合演算法過程
這一章節,我們通過幾個具體的例子,來具體看下,D-S證據融合理論與演算法是如何對現實世界的問題進行建模與分析的。
0x1:在光感測探測器的多探測器結果融合問題中,D-S證據融合演算法的建模過程
- Null
- Red
- Yellow
- Green
- Red or Yellow
- Red or Green
- Yellow or Green
- Any
以及這些假設相應的可能性(也就是說基本分配函數Mass)。
Hypothesis | Mass | Belief | Plausibility |
---|---|---|---|
Null | 0 | 0 | 0 |
Red | 0.35 | 0.35 | 0.56 |
Yellow | 0.25 | 0.25 | 0.45 |
Green | 0.15 | 0.15 | 0.34 |
Red or Yellow | 0.06 | 0.66 | 0.85 |
Red or Green | 0.05 | 0.55 | 0.75 |
Yellow or Green | 0.04 | 0.44 | 0.65 |
Any | 0.1 | 1.0 | 1.0 |
- 每個假設的mass函數值都在0和1之間;
- 空集的mass函數值為0,即另外其他的假設mass值得和為1;
- 在所有假設中,使得mass值大於0的命題(例如A),稱為焦元(Focal element)
- m(Null) = 0
- m(Red) + m(Yellow) + m(Green) + … + m(Any) = 1
- 命題A為:Red,那麼 Bel(A) = 0.35,因為只有它本身是屬於假設 A
- 命題A為:Red or Yellow,那麼 Bel(Red or Yellow) = m(Null) + m(Red) + m(Yellow) + m(Red or Yellow) = 0 + 0.35 + 0.25 + 0.06 = 0.66
0x2:在法律目擊證人問題中,D-S證據融合演算法的Zadeh悖論問題
一宗謀殺案有三個犯罪嫌疑人:U = {Peter, Paul, Mary}。兩個目擊證人(W1,W2)分別指證犯罪嫌疑人,得到兩個mass函數 m1 和 m2。

由此,我們得到了如上表所示的組合函數 m12。

0x3:在自動駕駛問題中,D-S證據融合演算法對多感知器的融合作用
以自動駕駛障礙物檢測數據融合為例。
首先確定假設空間,對於自動駕駛場景來說,某一個時刻,對一個具體空間的情況,會有一些命題假設:
- 有障礙物A
- 無障礙物B
- 有障礙物或者無障礙物C
- 空集D(既不是有障礙物,也不是無障礙物,就是沒法判斷)四種情況
第二步確定基本分配概率,即確定每個證據源對於命題空間中每種命題假設發生的概率分布。
比如LiDAR作為證據源之一,在某個時刻對某個位置各個假設的分配概率為:
- A的概率是0.6
- 假設B的概率0.3
- 假設C的0.1
- 假設D的概率為0
分別用可信數來表示如下:
而影像感知器作為另一個證據源之一,在某個時刻對某個位置各個假設的分配概率為:
- A的概率是0.1
- 假設B的概率0.8
- 假設C的0.1
- 假設D的概率為0
分別用可信數來表示如下:
m(A)image = 0.1,m(B)image = 0.8,m(C)image = 0.1,m(C)image = 0
LiDAR和Image證據,對各個命題假設的置信度分配矩陣如下:
第三步是計算歸一化常數(1 – K)(K是衝突因子,表示兩個集合的衝突性)。
1 – K = 0.51
第三步計算每個命題假設的聯合mass:
第四步計算每個命題假設的信度函數(Bel)和似真概率(pl):
命題假設 | mL | mI | m{L,I} | bel | pl |
A | 0.6 | 0.1 | 0.275 | 0.275 | 0.295 |
B | 0.3 | 0.8 | 0.706 | 0.706 | 0.726 |
C | 0.1 | 0.1 | 0.020 | 1 | 1 |
D | 0 | 0 | 0 | 0 | 0 |
0x4:D-S證據合成中,難以辨識模糊程度

0x5:D-S證據合成中,基本概率分配函數的微小變化會使組合結果產生急劇變化
https://blog.csdn.net/zfcjhdq/article/details/51566875 https://www.cnblogs.com/mmgf/p/8025261.html https://blog.csdn.net/wsyhawl/article/details/78261554 https://www.ixueshu.com/document/889d1a6efed6edd8933d7c8c81103cda318947a18e7f9386.html https://blog.csdn.net/wsyhawl/article/details/78261554 https://blog.csdn.net/zfcjhdq/article/details/51566875 https://www.cnblogs.com/mmgf/p/8025261.html https://blog.csdn.net/zfcjhdq/article/details/51566875 https://blog.csdn.net/am45337908/article/details/48832947 https://zhuanlan.zhihu.com/p/99517591 http://dy.163.com/v2/article/detail/ELQG2DUV05311NDJ.html
5. 證據的決策規則
在前面的章節中,我們討論了基本的證據融合方法以及對命題假設的似真度估計等問題。
這個章節開始,我們繼續深入現實世界的複雜場景中,來討論一下我們如何基於證據融合進行未來決策。
在決策分析中,系統的未來狀態在一般情況下難以精確地給出。一般專家根據其掌握的知識與經驗對系統狀態進行分析,給出系統未來狀態的主觀估計。
當利用專家群體對系統未來狀態進行估計時,就需要對專家群體的預測意見進行合成。
設決策集為:
D = {d1,d2,….,dm}
在不同的系統狀態下,選擇不同的決策,所獲得的報酬(收益)是不同的。第 i 個系統狀態,第 j 個決策方案對應的報酬函數為:
r(di,xj),(i = 1,….,m;j = 1,….,n)
在已知上述條件下,如何進行決策,是基於證據理論進行決策要解決的問題。
0x1:基於證據理論的傳統證據決策規則
1、基於似真度函數求出各種系統狀態的主觀概率,然後求出各個決策方案的期望報酬
其過程可以表示如下:
利用似真度函數計算公式:
求出單點似真度:
令:
則 p(x1),…,p(xn) 即可作為各種狀態出現的主觀概率。
選擇與期望報酬最大值對應的決策方案為最佳方案,即:
使用這種決策方案在應用時,應該注意它的假設前提:
它認為似真度越大,其主觀概率就越大。但是一般情況下,點函數 pl({xi}) 不能表示基本可信數 m 或信度函數 Bel 所包含的所有資訊,點函數 pl({xi}) 越大,m({xi}) 與 Bel({xi}) 不一定越大。
2、M決策法
該方法的思想是先求出各焦元的報酬函數,然後用基本可信數作為主觀概率,求出各決策方案的期望報酬作為決策依據。
其過程可以表示如下,令:
則有:
選擇與期望報酬最大值對應的決策方案為最佳方案。
使用這種決策方案在應用時,應該注意它的假設前提:
該方法計算對應於焦元的報酬函數,該計算公式是對焦元所包含的狀態因素的報酬數值求簡單平均值。這也就意味著該方法認為每個焦元內包含的各狀態因素其出現的概率是相同的,而一般情況下,各狀態出現的概率是不相等的。
0x2:基於焦元分析求解各狀態的基本可信數的決策方法
上一個小節末我們說到,由於上述兩個假設的存在,使上述兩種決策方法存在著局限性。要科學地解決該問題,就是要解決如何在已知各焦元的基本可信度分配的條件下,求解系統各狀態的發生概率或基本可信數。
因此有學者提出了,「基於焦元分析求解各狀態的基本可信數的決策方法」,對群體專家意見合成得到的基本可信數的焦元進行分析,可分為兩種情況來處理,
- 第一種情況是已知所有系統狀態因素的基本可信數,即已知 m({xi}),i=1,….,n
- 第二種情況是未知所有系統狀態因素的基本可信數
1、第一種情況
系統各狀態的基本可信數作為分析計算各狀態發生概率的依據。此時 m(Θ) 可能不等於零。m(Θ) 表示對證據的懷疑。
當對系統各狀態的基本可信數充分信任時,可用下式定義各狀態發生概率:
當對系統各狀態的基本可信數有懷疑時,則可將 m(Θ) 均分給各個狀態,即可用下式定義各狀態發生概率:
2、第二種情況
用逐步求精的方法對基本可信數的焦元進行分析求解構成焦元的狀態因素的基本可信數。
首先,對由單個狀態因素構成的焦元確定該單個狀態因素的基本可信數就是與該焦元對應的基本可信數。
其次,對焦元中含有已知基本可信數的單個狀態因素的焦元,可將已知基本可信數的單個狀態因素及其基本可信數中提出,這樣可以縮小等待進一步分配的焦元所含構成狀態因素的數量,並可能縮小未知基本可信數的單個狀態因素的數量。
最好,對還不能確定基本可信數的單個狀態因素,就要引入新的資訊進行判斷與求解。
例如,已知:
則可首先確定:
其次可確定:
與
接下來,對 m({x3}) 和 m({x4}) 就要引入新的資訊來確定,可向專家諮詢:「已知 m({x3, x4}) = 0.2,m({x3}), m({x4})應為多少?」,專家可根據自己的知識,做出進一步的判斷。將專家的意見進行合成後,就可得到 m({x3}) 和 m({x4}) 的數值。
當求解出系統各狀態的基本可信數 m({xi}) 後,就可用第一種情況下的方法求解各狀態發生概率。
求解出各系統狀態的發生概率後,計算各個決策方案的期望報酬,選擇出最優決策方案,其計算公式如下:
0x3:基於粗糙集的證據決策規則分析
可進一步把證據理論的合成結果轉化為規則,在合成結果中發生概率最大的某種狀態,就是將要發生的結果。
經過轉化後,歷史樣本就可以看做決策表,可返回歷史樣本對各個合成規則的不確定性進行分析與評價。
6. D-S證據融合理論的編碼示例
0x1:a Python implementation for the Dempster–Shafer concept


""" Shows different use cases of the library. """ from __future__ import print_function from pyds import MassFunction from itertools import product print('=== creating mass functions ===') m1 = MassFunction({'ab':0.6, 'bc':0.3, 'a':0.1, 'ad':0.0}) # using a dictionary print('m_1 =', m1) m2 = MassFunction([({'a', 'b', 'c'}, 0.2), ({'a', 'c'}, 0.5), ({'c'}, 0.3)]) # using a list of tuples print('m_2 =', m2) m3 = MassFunction() m3['bc'] = 0.8 m3[{}] = 0.2 print('m_3 =', m3, ('(unnormalized mass function)')) #print('\n=== belief, plausibility, and commonality ===') #print('bel_1({a, b}) =', m1.bel({'a', 'b'})) #print('pl_1({a, b}) =', m1.pl({'a', 'b'})) #print('q_1({a, b}) =', m1.q({'a', 'b'})) #print('bel_1 =', m1.bel()) # entire belief function #print('bel_3 =', m3.bel()) #print('m_3 from bel_3 =', MassFunction.from_bel(m3.bel())) # construct a mass function from a belief function #print('\n=== frame of discernment, focal sets, and core ===') #print('frame of discernment of m_1 =', m1.frame()) #print('focal sets of m_1 =', m1.focal()) #print('core of m_1 =', m1.core()) #print('combined core of m_1 and m_3 =', m1.core(m3)) #print('\n=== Dempster\'s combination rule, unnormalized conjunctive combination (exact and approximate) ===') #print('Dempster\'s combination rule for m_1 and m_2 =', m1 & m2) #print('Dempster\'s combination rule for m_1 and m_2 (Monte-Carlo, importance sampling) =', m1.combine_conjunctive(m2, sample_count=1000, importance_sampling=True)) #print('Dempster\'s combination rule for m_1, m_2, and m_3 =', m1.combine_conjunctive(m2, m3)) #print('unnormalized conjunctive combination of m_1 and m_2 =', m1.combine_conjunctive(m2, normalization=False)) #print('unnormalized conjunctive combination of m_1 and m_2 (Monte-Carlo) =', m1.combine_conjunctive(m2, normalization=False, sample_count=1000)) #print('unnormalized conjunctive combination of m_1, m_2, and m_3 =', m1.combine_conjunctive([m2, m3], normalization=False)) #print('\n=== normalized and unnormalized conditioning ===') #print('normalized conditioning of m_1 with {a, b} =', m1.condition({'a', 'b'})) #print('unnormalized conditioning of m_1 with {b, c} =', m1.condition({'b', 'c'}, normalization=False)) #print('\n=== disjunctive combination rule (exact and approximate) ===') #print('disjunctive combination of m_1 and m_2 =', m1 | m2) #print('disjunctive combination of m_1 and m_2 (Monte-Carlo) =', m1.combine_disjunctive(m2, sample_count=1000)) #print('disjunctive combination of m_1, m_2, and m_3 =', m1.combine_disjunctive([m2, m3])) #print('\n=== weight of conflict ===') #print('weight of conflict between m_1 and m_2 =', m1.conflict(m2)) #print('weight of conflict between m_1 and m_2 (Monte-Carlo) =', m1.conflict(m2, sample_count=1000)) #print('weight of conflict between m_1, m_2, and m_3 =', m1.conflict([m2, m3])) #print('\n=== pignistic transformation ===') #print('pignistic transformation of m_1 =', m1.pignistic()) #print('pignistic transformation of m_2 =', m2.pignistic()) #print('pignistic transformation of m_3 =', m3.pignistic()) print('\n=== local conflict uncertainty measure ===') print('local conflict of m_1 =', m1.local_conflict()) print('entropy of the pignistic transformation of m_3 =', m3.pignistic().local_conflict()) #print('\n=== sampling ===') #print('random samples drawn from m_1 =', m1.sample(5, quantization=False)) #print('sample frequencies of m_1 =', m1.sample(1000, quantization=False, as_dict=True)) #print('quantization of m_1 =', m1.sample(1000, as_dict=True)) #print('\n=== map: vacuous extension and projection ===') #extended = m1.map(lambda h: product(h, {1, 2})) #print('vacuous extension of m_1 to {1, 2} =', extended) #projected = extended.map(lambda h: (t[0] for t in h)) #print('project m_1 back to its original frame =', projected) #print('\n=== construct belief from data ===') #hist = {'a':2, 'b':0, 'c':1} #print('histogram:', hist) #print('maximum likelihood:', MassFunction.from_samples(hist, 'bayesian', s=0)) #print('Laplace smoothing:', MassFunction.from_samples(hist, 'bayesian', s=1)) #print('IDM:', MassFunction.from_samples(hist, 'idm')) #print('MaxBel:', MassFunction.from_samples(hist, 'maxbel')) #print('MCD:', MassFunction.from_samples(hist, 'mcd'))
View Code
這個程式碼示例涵蓋了D-S證據理論主要的定理公式,讀者朋友可以通過打開注釋以及debug單步調試,幫助我們更好地從程式碼層面理解抽象的數學公式。
Relevant Link:
https://github.com/reineking/pyds
0x2:simple ADS clickfraud detection


# -*- coding: utf-8 -*- """ This script implement algorithm from THESIS "Automatic Detection of Click Fraud in Online Advertisements" AGARWAL Calculate the belief of fraud http://repositories.tdl.org/ttu-ir/bitstream/handle/2346/46429/AGARWAL-THESIS.pdf Import library using pyds (https://github.com/reineking/pyds) Require: Python3, numpy, scipy, redis (https://github.com/andymccurdy/redis-py) Author: TrucLK """ from pyds import MassFunction from itertools import product import redis import sys import pprint import time ################################################ class Config(object): """ Redis for store click history """ redis_host = '127.0.0.1' redis_port = 6379 redis_db = 0 # Time for redis key expire time_to_expire = 1800 # Visitor length of checking visit_length = 1800 # weight is an empirically derived value that signifies the strength of the evidence in supporting the user is fraud # caution changing it will change the maximum value of result. IDWeight = 0.5 UAWeight = 0.4 IPWeight = 0.4 class EcLogger(object): """ This is object relate to log """ def __init__(self): self.ro = redis.Redis(host=Config.redis_host, port=Config.redis_port, db=Config.redis_db) def record(self, hit): """ Add click to history for checking """ clickid = self.ro.incr('ec:clicknum') self.ro.zadd('click:ip:' + hit.ip, clickid, int(hit.time)) self.ro.expire('click:ip:' + hit.ip, Config.time_to_expire) self.ro.zadd('click:cookie:' + hit.cookie, clickid, int(hit.time)) self.ro.expire('click:cookie:' + hit.cookie, Config.time_to_expire) self.ro.zadd('click:config:' + hit.pubid + ':' + hit.config, clickid, int(hit.time)) self.ro.expire('click:config:' + hit.pubid + ':' + hit.config, Config.time_to_expire) pass def getClickNumFromIp(self, hit): """ Get click time from this IP address in this session """ count = 0 count = self.ro.zcount('click:ip:' + hit.ip, int(hit.time) - Config.visit_length, int(hit.time)) if (count == 0): count = 1 return count def getClickNumFromCookie(self, hit): """ Get click time from this anonymous id in this session """ count = self.ro.zcount('click:cookie:' + hit.cookie, int(hit.time) - Config.visit_length, int(hit.time)) if (count == 0): count = 1 return count def getClickNumFromConfig(self, hit): """ Get cliock time from this user agent in this session """ count = self.ro.zcount('click:config:' + hit.pubid + ':' + hit.config, int(hit.time) - Config.visit_length, int(hit.time)) if (count == 0): count = 1 return count class Hit(object): """ It's a simple container. """ def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) super(Hit, self).__init__() class ClickProcessingUnit(object): """ Mass functions for Click Fraud Detection """ def __init__(self, hit, ecLogger): self.hit = hit self.ecLogger = ecLogger class ClickProcessingUnitIp(ClickProcessingUnit): """ Evidence number of clicks on the ad by ip address Create mass function for ip address checking """ def process(self): numberclick = self.ecLogger.getClickNumFromIp(self.hit) coefficient_value = Config.IPWeight a = coefficient_value * (1 - 1 / numberclick) b = 0 ab = 1 - a massfunction = MassFunction({'a': a, 'b': b, 'ab': ab}) return massfunction; class ClickProcessingUnitCookie(ClickProcessingUnit): """ Evidence number of clicks on the ad by user ID Create mass function for id cookie checking """ def process(self): numberclick = self.ecLogger.getClickNumFromCookie(self.hit) coefficient_value = Config.IDWeight a = coefficient_value * (1 - 1 / numberclick) b = 0 ab = 1 - a massfunction = MassFunction({'a': a, 'b': b, 'ab': ab}) return massfunction; class ClickProcessingUnitConfig(ClickProcessingUnit): """ Evidence number of clicks on the ad by user agent Create mass function for user agent checking """ def process(self): numberclick = self.ecLogger.getClickNumFromConfig(self.hit) coefficient_value = Config.UAWeight a = coefficient_value * (1 - 1 / numberclick) b = 0 ab = 1 - a massfunction = MassFunction({'a': a, 'b': b, 'ab': ab}) return massfunction class ClickProcessing(object): """ Main click processing function """ def __init__(self, hit, ecLogger): self.hit = hit self.ecLogger = ecLogger def process(self): processingList = self.getListOfProcessing() m = None # Loop for list of processing class for processing in processingList: # Init first mass function if not m: m = processing.process() else: # Dempster's rule of combination create a new mass function m = m & processing.process() return m def getListOfProcessing(self): """ Config list of processing """ dict1 = [] dict1.append(ClickProcessingUnitIp(self.hit, self.ecLogger)) dict1.append(ClickProcessingUnitCookie(self.hit, self.ecLogger)) dict1.append(ClickProcessingUnitConfig(self.hit, self.ecLogger)) return dict1 if __name__ == '__main__': """ Add click from command line argument """ try: hit = hit = Hit( ip=sys.argv[1], time=sys.argv[2], config=sys.argv[3], cookie=sys.argv[4], pubid=sys.argv[5], ) ecLogger = EcLogger() processing = ClickProcessing(hit, ecLogger) ecLogger.record(hit) m = processing.process() print(m.bel('a')) sys.exit(0) except Exception: print(0.01) sys.exit(0)
View Code
讀者朋友在閱讀這段程式碼的時候,要重點理解信度分配函數部分,
對信度函數的分配,涉及到我們如何對待解決問題進行抽象建模的過程,可以將其簡單理解為一種特徵工程。
Relevant Link:
https://github.com/afterlastangel/simple_ads_clickfraud_detection/blob/master/clickprocess.py https://ttu-ir.tdl.org/bitstream/handle/2346/46429/AGARWAL-THESIS.pdf
0x3:integrate the RSS data processed by multiple machine learning algorithms


# -*- coding: utf-8 -*- import os import time import numpy as np import scipy.io as sio import sklearn # 測試樣本對應的真實格點坐標 def test_real(path): real1=[] for i in range(grid_num): for j in range(grid_test_sample): real1.append(i+1) real=np.array(real1) return real # 模型:一共有56個格點,每個格點上訓練數據100組,測試數據10組 def read_data(path): pathtrain = os.path.join(path,'train.mat') traindataset = sio.loadmat(pathtrain).get('data') # 訓練數據 pathtest = os.path.join(path,'test.mat') testdataset = sio.loadmat(pathtest).get('X_test') # 測試數據 trainlabelset1 = [] for grid in range(grid_num): for sample in range(grid_train_sample): local_tmp = grid + 1 trainlabelset1.append(local_tmp) trainlabelset = np.array(trainlabelset1) # 訓練數據集對應坐標 testlabelset = test_real(path) # 測試數據集對應坐標 return traindataset, trainlabelset, testdataset, testlabelset # KNN Classifier # KNeighborsClassifier(n_neighbors=5, weights=』uniform』, algorithm=』auto』, leaf_size=30, p=2, # metric=』minkowski』, metric_params=None, n_jobs=1, **kwargs) def knn_classifier(train_x, train_y): from sklearn.neighbors import KNeighborsClassifier model = KNeighborsClassifier() model.fit(train_x, train_y) return model # Logistic Regression Classifier # LogisticRegression(penalty=』l2』, dual=False, tol=0.0001, C=1.0, fit_intercept=True, # intercept_scaling=1, class_weight=None, random_state=None, solver=』liblinear』, # max_iter=100, multi_class=』ovr』, verbose=0, warm_start=False, n_jobs=1) def logistic_regression_classifier(train_x, train_y): from sklearn.linear_model import LogisticRegression model = LogisticRegression(penalty='l2') model.fit(train_x, train_y) return model # Random Forest Classifier def random_forest_classifier(train_x, train_y): from sklearn.ensemble import RandomForestClassifier model = RandomForestClassifier(n_estimators=8) model.fit(train_x, train_y) return model # Decision Tree Classifier def decision_tree_classifier(train_x, train_y): from sklearn import tree model = tree.DecisionTreeClassifier() model.fit(train_x, train_y) return model # GBDT(Gradient Boosting Decision Tree) Classifier def gradient_boosting_classifier(train_x, train_y): from sklearn.ensemble import GradientBoostingClassifier model = GradientBoostingClassifier(n_estimators=200) model.fit(train_x, train_y) return model # SVM Classifier def svm_classifier(train_x, train_y): from sklearn.svm import SVC model = SVC(kernel='rbf', probability=True) model.fit(train_x, train_y) return model # SVM Classifier using cross validation def svm_cross_validation(train_x, train_y): from sklearn.grid_search import GridSearchCV from sklearn.svm import SVC model = SVC(kernel='rbf', probability=True) param_grid = {'C': [1e-3, 1e-2, 1e-1, 1, 10, 100, 1000], 'gamma': [0.001, 0.0001]} grid_search = GridSearchCV(model, param_grid, n_jobs = 1, verbose=1) grid_search.fit(train_x, train_y) best_parameters = grid_search.best_estimator_.get_params() for para, val in best_parameters.items(): print para, val model = SVC(kernel='rbf', C=best_parameters['C'], gamma=best_parameters['gamma'], probability=True) model.fit(train_x, train_y) return model def func_rmse(predict_label,real): pathlocal=os.path.join(path,'dist1.mat') local=sio.loadmat(pathlocal).get('dist')#56個格點坐標 tmp = predict_label-np.ones(np.shape(predict_label)) predict_2d = local[map(int,tmp)] tmp_real = real-np.ones(np.shape(predict_label)) real_2d = local[map(int,tmp_real)] norm_sqrt = (np.linalg.norm(predict_2d-real_2d,axis=-1))**2 rmse = np.sqrt(sum(norm_sqrt)/np.shape(predict_label)) return rmse def get_probability(data_chuang): """ 對窗數據來說,維度為:len_chuang*演算法個數 計算每種演算法下格點序號出現次數及概率 """ mygrid = range(1,57) cishu_total=[] for sf in range(len_sf): cishu=[] for item in mygrid: cishu_sf = data_chuang[:,sf].tolist().count(item) cishu.append(cishu_sf) cishu = np.array(cishu) cishu_total.append(cishu) probio_total = [item/float(len_chuang) for item in cishu_total] probio_total = np.array(probio_total).transpose() return probio_total if __name__ == '__main__': path = os.path.abspath('.') thresh = 0.5 model_save_file = None model_save = {} grid_num = 56 grid_train_sample = 100 grid_test_sample = 10 test_classifiers = ['KNN', 'LR', 'RF', 'DT', 'SVM'] classifiers = { 'KNN': knn_classifier, 'LR': logistic_regression_classifier, 'RF': random_forest_classifier, 'DT': decision_tree_classifier, 'SVM': svm_classifier, 'SVMCV': svm_cross_validation, 'GBDT': gradient_boosting_classifier } print 'reading training and testing data...' train_x, train_y, test_x, test_y = read_data(path) # print "test_x: ", test_x # print "test_y: ", test_y num_train, num_feat = train_x.shape num_test, num_feat = test_x.shape is_binary_class = (len(np.unique(train_y)) == 56) print '******************** Data Info *********************' print '#training data: %d, #testing_data: %d, dimension: %d' % (num_train, num_test, num_feat) predict_total2 = [] rmse_total = [] for classifier in test_classifiers: print '******************* %s ********************' % classifier start_time = time.time() model = classifiers[classifier](train_x, train_y) print 'training took %fs!' % (time.time() - start_time) predict = model.predict(test_x) if model_save_file != None: model_save[classifier] = model if is_binary_class: tmp = func_rmse(predict, test_y) rmse_total.append(tmp) accuracy = sklearn.metrics.accuracy_score(test_y, predict) print 'accuracy: %.2f%%' % (100 * accuracy) predict_total2.append(predict) predict_total1 = np.array(predict_total2) predict_total = predict_total1.transpose() print "predict_total: ", predict_total """ 對已經被機器學習演算法進行預測之後的數據矩陣predict_total(維度為:測試樣本數*演算法數) 滑窗對數據集進行處理,窗長度為len_chuang,每次下滑一格 對窗長度內的數據進行概率計算並DS融合 """ len_chuang = 10 # 窗數據長度 pre_ds = [] pre_dsjq = [] pre_max = [] pre_test = [] for start in range(0, 560 - len_chuang + 1): # 滑動窗口 data_chuang = predict_total[start:len_chuang+start, :] # 取出窗長度的數據 len_sf = len(data_chuang[0]) # 多演算法個數 probio_total = get_probability(data_chuang) # 窗數據矩陣中每種演算法對應所有格點出現概率 """ 1.普通ds 等同於求多種演算法概率平均之後,概率最大的格點為所預測格點 """ init_mean = np.mean(probio_total,1) # 每個格點下多種演算法提供的平均概率 init_ds = init_mean # 這部分並沒有起到實際作用 for i in range(len_sf-1): init_ds = np.multiply(init_ds,init_mean) k = sum(init_ds) bel = init_ds/float(k) pre_grid = np.argmax(bel)+1 pre_ds.append(pre_grid) """ 2.加強ds 求得多種演算法的源可信度,得到估計 """ m_between = np.zeros([len_sf,len_sf]) for i in range(len_sf): for j in range(len_sf): m_between[i, j]=sum(np.multiply(probio_total[:,i],probio_total[:,j])) # m_between兩個證據的內積=兩個證據中,每個出現事件的概率乘積*(出現事件的交集/並集) # 對單個事件而不是集合事件而言,等同於對應事件的概率乘積之和 d = np.zeros([len_sf,len_sf]) sim = np.zeros([len_sf,len_sf]) for i in range(len_sf): for j in range(len_sf): d[i,j]=np.sqrt(0.5*(m_between[i,i]+m_between[j,j]-2*m_between[i,j])) # d為兩個證據間的距離,距離越小表示兩個證據提出的意見越一致 sim[i,j]=1-d[i,j] # sim為兩個證據之間的相似度,越大代表兩個證據之間的一致性越強 sup = np.zeros(len_sf) for i in range(len_sf): sup[i]=sum(sim[i,:])-sim[i,i] # sup為對每個證據的支援度,為兩個證據之間的相似度之和減去該證據自己對自己的支援度 # 證據對自己的支援度為1 crd = np.zeros(len_sf) for i in range(len_sf): crd[i]=float(sup[i])/sum(sup) # crd為證據的可信度 # 即為歸一化的支援度,其他證據對該證據支援度越高,則可信度越高 A = np.zeros(grid_num) for i in range(grid_num): A[i] = sum(np.multiply(probio_total[i,:],crd)) # 將可信度作為源權重,估計所有情況下數據出現的概率 AA = A # 這部分並沒有起到實際作用 # 對於所有元素均為0-1的概率值,進行元素對於相乘之後並不改變元素的大小排序 for i in range(len_sf-1): init_ds = np.multiply(AA,A) # 分子為與某事件有交集的事件概率之乘積 k = sum(init_ds) # 分母K=∑(所有有交集的事件的概率乘積) # 或者為1-∑(所有不相交的時間概率乘積) # 對全部都是單事件而不是集體事件而言,有交集的事件即為事件其本身 # K表示了證據的衝突程度,衝突越大,越接近0,一致性越大,越接近1 bel = init_ds/float(k) pre_grid = np.argmax(bel)+1 pre_dsjq.append(pre_grid) # 下面兩行程式碼是為了驗證多次融合A矩陣並不能起到結果 testgrid = np.where(A == np.max(A))[0][0] pre_test.append(testgrid+1) """ 3.使用窗數據中出現次數最多的作為正確結果 """ data_chuang_list = data_chuang.flatten().tolist() matrix_cishu = max(data_chuang_list,key=data_chuang_list.count) pre_max.append(matrix_cishu) truth_chuang = test_y[len_chuang-1:] rmse_ds = func_rmse(pre_ds, truth_chuang) rmse_max = func_rmse(pre_max, truth_chuang) rmse_dsjq = func_rmse(pre_dsjq, truth_chuang) rmse_test = func_rmse(pre_test, truth_chuang) print "pre_ds: ", pre_ds print "rmse_ds: ", rmse_ds accuracy = sklearn.metrics.accuracy_score(test_y[len_chuang-1:], pre_ds) print 'accuracy: %.2f%%' % (100 * accuracy)
View Code
基於D-S證據融合演算法,對包括KNN、LR、RF、DT、SVM在內的5種演算法的概率預測結果(每一個演算法的預測結果相當於一個命題假設),進行融合,將融合得到的概率結果作為最終的概率預測結果。
可以看到,融合後的概率預測結果,總體上優於單個演算法的預測結果,僅略遜於LR演算法,同時規避了LR的過擬合問題。
Relevant Link:
https://github.com/Keats13/dempster_shafer_multi_rss
0x4:A fuzzy machine learning algorithm utilizing Dempster-Shafer and Bayesian Theory
Relevant Link:
https://github.com/aus10powell/fuzzy_dempster_shafer
0x5:Implementation in Python for Dempster Shafer algorythm with application in predicting movies genres by reviews
Relevant Link:
https://github.com/Moldoteck/Dempster-Shafer
Relevant Link:
https://github.com/bpietropaoli/THEGAME-Python https://github.com/amineremache/dempster-shafer/blob/master/dempster_shafer.py https://github.com/redaneqrouz/dempster-Shafer/blob/master/dams.py https://github.com/leokan92/dempster-shafer/blob/master/Dempstershafercomb.py https://github.com/Zhiming-Huang/Dempster-shafer-combination-rules/blob/master/DS.py
7. D-S理論和貝葉斯理論的區別
筆者在進行相關理論學習和編碼開發的過程中,隱約中感覺到D-S理論和Bayesian理論之間存在比較明顯的區別,似乎D-S是從貝葉斯理論的基礎上發展而來的。但是又不能完全理解其中的脈絡,這裡權且放一些粗淺的理論,留待將來解決。
- D-S和貝葉斯理論都是基於集合論構建出的理論體系,它們的基本計算單元都是離散集合。
- D-S在離散統計概率的基礎上,考慮了隨機事件概率之間的衝突性,提出了可信度分配和信度函數等新概念,使得對多源證據的融合成為可能