guava eventbus 原理+源碼分析

前言:

guava提供的eventbus可以很方便的處理一對多的事件問題, 最近正好使用到了,做個小結,使用的demo網上已經很多了,不再贅述,本文主要是源碼分析+使用注意點+新老版本eventbus實現方式對比

一.原理

 

 

將定義的hander註冊到eventbus中,eventbus遍歷該handler及其父類中含有@subscribe註解的方法,封裝成subscriber對象,一個event會對應多個方法,Map<EventType.class,List<Subscriber>>,但既然是guava出品,這種情況下一定會用自己家的MultiMap了,接收到event後根據類型匹配對應的subscriber去執行,接下來從源碼角度探究下

 二.源碼分析

主要分析註冊與分發處理,會貼相關的源碼的注釋(guava版本github 2021 1月版本),方便你閱讀

1.註冊流程

分析之前我們先簡要拓展下關於guava cache的用法,compute if absent,不存在則計算,對應getOrLoad方法(暴露給用戶的是get()),有則直接返回,

註冊流程抓住一個關鍵點即可,即一個subscriber對應一個被@subscriber標記的method,為了方便閱讀,我把程式碼貼到一起

  1   /** Registers all subscriber methods on the given listener object. */
  2   void register(Object listener) {
  3     // key-eventType.class value-List<Subscriber>,一個subscriber對應一個方法
  4     Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);
  5 
  6     for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
  7       Class<?> eventType = entry.getKey();
  8       Collection<Subscriber> eventMethodsInListener = entry.getValue();
  9       // 並發讀寫
 10       CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
 11 
 12       if (eventSubscribers == null) {
 13         CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
 14         // eventType.class不存在時才put,concurrenthashmap的putIfAbsent()
 15         // 有可能為null,用newSet替換
 16         eventSubscribers =
 17             MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
 18       }
 19       // 添加
 20       eventSubscribers.addAll(eventMethodsInListener);
 21     }
 22   }
 23   
 24   
 25   /**
 26    * Returns all subscribers for the given listener grouped by the type of event they subscribe to.
 27    */
 28   private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
 29     Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
 30     Class<?> clazz = listener.getClass();
 31     for (Method method : getAnnotatedMethods(clazz)) {
 32       Class<?>[] parameterTypes = method.getParameterTypes();
 33       Class<?> eventType = parameterTypes[0];
 34       // 創建subscriber時,如果未添加@AllowConcurrentEvents註解則生成同步的subscriber
 35       methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
 36     }
 37     return methodsInListener;
 38   }
 39 
 40   private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
 41     try {
 42       return subscriberMethodsCache.getUnchecked(clazz);
 43     } catch (UncheckedExecutionException e) {
 44       throwIfUnchecked(e.getCause());
 45       throw e;
 46     }
 47   }
 48 
 49 // 映射關係快取,getOrload
 50   private static final LoadingCache<Class<?>, ImmutableList<Method>> subscriberMethodsCache =
 51       CacheBuilder.newBuilder()
 52           .weakKeys()
 53           .build(
 54               new CacheLoader<Class<?>, ImmutableList<Method>>() {
 55                 @Override
 56                 public ImmutableList<Method> load(Class<?> concreteClass) throws Exception {
 57                   return getAnnotatedMethodsNotCached(concreteClass);
 58                 }
 59               });
 60 
 61 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
 62     // 獲得listener的所有父類及自身的class(包括介面)
 63     Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
 64     Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
 65     for (Class<?> supertype : supertypes) {
 66       for (Method method : supertype.getDeclaredMethods()) {
 67         if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
 68           // TODO(cgdecker): Should check for a generic parameter type and error out
 69           Class<?>[] parameterTypes = method.getParameterTypes();
 70           // 參數校驗,@subscribe註解的方法有且有能有一個非原始類型參數
 71           checkArgument(
 72               parameterTypes.length == 1,
 73               "Method %s has @Subscribe annotation but has %s parameters. "
 74                   + "Subscriber methods must have exactly 1 parameter.",
 75               method,
 76               parameterTypes.length);
 77 
 78           checkArgument(
 79               !parameterTypes[0].isPrimitive(),
 80               "@Subscribe method %s's parameter is %s. "
 81                   + "Subscriber methods cannot accept primitives. "
 82                   + "Consider changing the parameter to %s.",
 83               method,
 84               parameterTypes[0].getName(),
 85               Primitives.wrap(parameterTypes[0]).getSimpleName());
 86 
 87           MethodIdentifier ident = new MethodIdentifier(method);
 88           // 重寫的方法只放入一次
 89           if (!identifiers.containsKey(ident)) {
 90             identifiers.put(ident, method);
 91           }
 92         }
 93       }
 94     }
 95     return ImmutableList.copyOf(identifiers.values());
 96   }
 97 
 98 
 99   // 創建subscriber
100   static Subscriber create(EventBus bus, Object listener, Method method) {
101     return isDeclaredThreadSafe(method)
102         ? new Subscriber(bus, listener, method)
103         : new SynchronizedSubscriber(bus, listener, method);
104   }
105 
106   @VisibleForTesting
107   static final class SynchronizedSubscriber extends Subscriber {
108 
109     private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
110       super(bus, target, method);
111     }
112 
113     @Override
114     void invokeSubscriberMethod(Object event) throws InvocationTargetException {
115       synchronized (this) {
116         super.invokeSubscriberMethod(event);
117       }
118     }
119   }

值得注意的是subscriber的生成,即便你使用了AsyncEventbus,卻沒有在處理方法上聲明@AllowConcurrentEvents,那麼在處理event時仍然是同步執行的,註冊流程並發安全問題請看第三部分

2.分發流程

先看下如何獲得event對應的subscriber

 1 public void post(Object event) {
 2     Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
 3     if (eventSubscribers.hasNext()) {
 4       // 分發,dispatcher有三種實現,ImmediateDispatcher(同步處理event,深度優先)
 5       // LegacyAsyncDispatcher(非同步處理event)
 6       // PerThreadQueuedDispatcher(默認,同步調用,廣度優先) 內置隊列,可以保證同一執行緒內的event的順序
 7       dispatcher.dispatch(event, eventSubscribers);
 8     } else if (!(event instanceof DeadEvent)) {
 9       // the event had no subscribers and was not itself a DeadEvent
10       // 把所有沒有被訂閱的event包裝成deadevent,用戶可以自己定義處理deadevent的方法,作為兜底
11       post(new DeadEvent(this, event));
12     }
13   }
14 
15   Iterator<Subscriber> getSubscribers(Object event) {
16     //獲得event的所有父類及自身的class(包括介面),從獲取subscriber的流程來看,post一個event
17     // 時,除了調用該event的處理方法也會調用該event父類的處理方法
18     ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());
19 
20     List<Iterator<Subscriber>> subscriberIterators =
21         Lists.newArrayListWithCapacity(eventTypes.size());
22 
23     for (Class<?> eventType : eventTypes) {
24       CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
25       if (eventSubscribers != null) {
26         // eager no-copy snapshot
27         subscriberIterators.add(eventSubscribers.iterator());
28       }
29     }
30     // 類似flatmap,扁平化
31     return Iterators.concat(subscriberIterators.iterator());
32   }
33 
34   @VisibleForTesting
35   static ImmutableSet<Class<?>> flattenHierarchy(Class<?> concreteClass) {
36     try {
37       return flattenHierarchyCache.getUnchecked(concreteClass);
38     } catch (UncheckedExecutionException e) {
39       throw Throwables.propagate(e.getCause());
40     }
41   }
42 
43   private static final LoadingCache<Class<?>, ImmutableSet<Class<?>>> flattenHierarchyCache =
44       CacheBuilder.newBuilder()
45           .weakKeys()
46           .build(
47               new CacheLoader<Class<?>, ImmutableSet<Class<?>>>() {
48                 // <Class<?>> is actually needed to compile
49                 @SuppressWarnings("RedundantTypeArguments")
50                 @Override
51                 public ImmutableSet<Class<?>> load(Class<?> concreteClass) {
52                   return ImmutableSet.<Class<?>>copyOf(
53                       TypeToken.of(concreteClass).getTypes().rawTypes());
54                 }
55               });

從程式碼可以看出,先對該event查詢上級,最後把所有event對應的subscriber返回,因此觸發一個event時,其父event的subscriber也會被調用

接下來看下post,流程eventbus有三種dispatcher(ImmediaDispatcher,PerThreadDispatcher,LegacyAsyncDispatcher)eventbus使用的是PerThreadDispatcher,AsyncEventBus使用LegacyAsyncDispatcher

①ImmediaDispatcher

從名字中的Immedia”即時”就能看出這個dispatcher收到event後會立即處理,不會進行非同步處理

程式碼如下:

 從圖中可以看出ImmediaDispatcher是針對每個event,調用其全部的subscriber進行處理,即儘可能多的調用subscriber,所以是廣度優先,這個dispatcher目前未被使用,了解即可

 ②PerThreadQueueDispatcher(默認的dispatcher)

同樣從名稱可以看出這種dispatcher是一個thread一個queue,那我們可以猜測內部有可能用了ThreadLocal,既然用了隊列,說明想要起到一個緩衝event處理的過程

隊列的緩衝功能使得dispatcher有能力吞吐更高的event,因此是一種深度優先策略,此外每執行緒每隊列的方式保證了event處理過程是對於每個執行緒而言是有序的,同樣是廣度優先,對

每一個event都分發到相關的subscriber進行處理,除此之外還有一個值得稱道的點,即Dispatching變數的使用,規避了遞歸產生的死循環問題

 1 private static final class PerThreadQueuedDispatcher extends Dispatcher {
 2 
 3     // This dispatcher matches the original dispatch behavior of EventBus.
 4 
 5     /** Per-thread queue of events to dispatch. */
 6     private final ThreadLocal<Queue<Event>> queue =
 7         new ThreadLocal<Queue<Event>>() {
 8           @Override
 9           protected Queue<Event> initialValue() {
10             return Queues.newArrayDeque();
11           }
12         };
13 
14     /** Per-thread dispatch state, used to avoid reentrant event dispatching. */
15     private final ThreadLocal<Boolean> dispatching =
16         new ThreadLocal<Boolean>() {
17           @Override
18           protected Boolean initialValue() {
19             return false;
20           }
21         };
22 
23     @Override
24     void dispatch(Object event, Iterator<Subscriber> subscribers) {
25       checkNotNull(event);
26       checkNotNull(subscribers);
27       // 如果只從程式碼來看,PerThreadQueuedDispatcher的dispatch方法始終
28       // 是單執行緒調用,並不需要ThreadLocal,但從拓展的角度看,當用戶自定義xxeventbus自己實現分發邏輯時,PerThreadQueuedDispatcher實現了執行緒安全的dispatch
29       //因為eventbus有可能會被多個執行緒調用,從框架的角度看,無論用戶是否多執行緒調用,都應該要保證執行緒安全
30       // 引用issue 3530中 //github.com/google/guava/issues/3530 的一個回答 if multiple threads are dispatching to this dispatcher, they will read different values for queueForThread and dispatching.
31       Queue<Event> queueForThread = queue.get();
32       queueForThread.offer(new Event(event, subscribers));
33 
34       // 如果未開始分發事件則進行處理,解決subscriber遞歸調用post產生的死循環
35       if (!dispatching.get()) {
36         dispatching.set(true);
37         try {
38           Event nextEvent;
39           // 對每一個event,分發到相關的subscribers中
40           while ((nextEvent = queueForThread.poll()) != null) {
41             while (nextEvent.subscribers.hasNext()) {
42               nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
43             }
44           }
45         } finally {
46           dispatching.remove();
47           queue.remove();
48         }
49       }
50     }

接下來看下剛剛說的dispatching的妙用demo

在guava-test下建立一個新的目錄方便我們修改源碼後進行測試,測試程式碼如下

Listener

 1 /**
 2  * @author tele
 3  * @Description
 4  * @create 2020-11-23
 5  */
 6 public class Listener {
 7 
 8     private final EventBus eventBus;
 9 
10     public Listener(EventBus eventBus) {
11         this.eventBus = eventBus;
12     }
13 
14     @Subscribe
15     public void record(String s) {
16         eventBus.post(s);
17         System.out.println("receive:"+ s);
18     }
19 }

Producer

 1 /**
 2  * @author tele
 3  * @Description
 4  * @create 2020-11-23
 5  */
 6 public class Producer {
 7 
 8     public String produce() {
 9         return "hello";
10     }
11 }

Main

 1 /**
 2  * @author tele
 3  * @Description
 4  * @create 2020-11-23
 5  */
 6 public class Main {
 7 
 8     public static void main(String[] args) {
 9         EventBus eventBus = new EventBus();
10         Listener listener = new Listener(eventBus);
11         Producer producer = new Producer();
12         eventBus.register(listener);
13         String produce = producer.produce();
14         eventBus.post(produce);
15     }
16 
17 }

程式碼很簡單,問題在於Listener遞歸調用了post方法,按照程式碼示意運行後會棧溢出(隊列中event堆積),receive:hello永遠不會列印,可事實真的如此嗎?

 很奇怪是嗎,並沒有產生堆棧溢出的問題,反而是不停的輸出receive:hello,接下來我們修改下PerThreadDispatcher的程式碼,將dispatching變數注釋掉

  

再執行下demo

 果然溢出了,關鍵點就在於dispatching變數對於同一執行緒的遞歸分發進行了處理,已經處理過就不再次進行分發,這樣我們的遞歸調用不停的產生的event得以被處理

 ③LegacyAsyncDispatcher

看名字挺奇怪的,但有async字樣,所以是非同步的dispatcher,LegacyAsyncDispacther是AsyncEventBus的專用dispatcher,由於將event對應的subscriber拆分後入隊,多執行緒情況下無法保證event入隊順序,也就無法保證subscriber的調用順序,但這樣處理實現了深度優先,即儘可能多的調用不同的event的subscriber,與PerThreadDispatcher相比程式碼難度小了不少,由於AsyncEventBus的初始化需要傳入執行緒池參數,所以AsyncEventBus實現了真正的非同步處理

 1 /** Implementation of a {@link #legacyAsync()} dispatcher. */
 2   private static final class LegacyAsyncDispatcher extends Dispatcher {
 3 
 4     // This dispatcher matches the original dispatch behavior of AsyncEventBus.
 5     //
 6     // We can't really make any guarantees about the overall dispatch order for this dispatcher in
 7     // a multithreaded environment for a couple reasons:
 8     //
 9     // 1. Subscribers to events posted on different threads can be interleaved with each other
10     //    freely. (A event on one thread, B event on another could yield any of
11     //    [a1, a2, a3, b1, b2], [a1, b2, a2, a3, b2], [a1, b2, b3, a2, a3], etc.)
12     // 2. It's possible for subscribers to actually be dispatched to in a different order than they
13     //    were added to the queue. It's easily possible for one thread to take the head of the
14     //    queue, immediately followed by another thread taking the next element in the queue. That
15     //    second thread can then dispatch to the subscriber it took before the first thread does.
16     //
17     // All this makes me really wonder if there's any value in queueing here at all. A dispatcher
18     // that simply loops through the subscribers and dispatches the event to each would actually
19     // probably provide a stronger order guarantee, though that order would obviously be different
20     // in some cases.
21 
22     /** Global event queue. */
23     private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
24         Queues.newConcurrentLinkedQueue();
25 
26     @Override
27     void dispatch(Object event, Iterator<Subscriber> subscribers) {
28       checkNotNull(event);
29       // 拆分後入隊
30       while (subscribers.hasNext()) {
31         queue.add(new EventWithSubscriber(event, subscribers.next()));
32       }
33 
34       EventWithSubscriber e;
35       while ((e = queue.poll()) != null) {
36         e.subscriber.dispatchEvent(e.event);
37       }
38     }
39 
40     private static final class EventWithSubscriber {
41       private final Object event;
42       private final Subscriber subscriber;
43 
44       private EventWithSubscriber(Object event, Subscriber subscriber) {
45         this.event = event;
46         this.subscriber = subscriber;
47       }
48     }
49   }

注意點:

1.eventbus默認使用的執行緒池MoreExecutors.directExecutor(),其execute方法是直接調用傳入的runnable的run方法,是非非同步的 

2.使用AsyncEventBus時,請在對應的方法上添加@AllowConcurrenEvents

三.從並發安全的角度出發,對比下新老版本的註冊流程

本部分為補充內容,重點探討新老版本的註冊並發安全問題,可略過

從20.0開始,event bus的註冊程變成了上面分析的,那麼之前的版本是如何實現的呢,一起來分析下.先切到16.0 的tag,註冊程式碼如下

顯然是使用了讀寫鎖,不加鎖,eventType會相互覆蓋(HashMultiMap是非執行緒安全的),先給eventbus加個getSubscriberByType(),記得修改下EventSubscriber的修飾符為public,然後做個多執行緒的測試

 1 /**
 2  * @author tele
 3  * @Description
 4  * @create 2021-01-24
 5  */
 6 public class ListenerA {
 7 
 8     @Subscribe
 9     public void handle(String msg) {
10         System.out.println("ListenerA:" + msg);
11     }
12 
13 }
14 
15 /**
16  * @author tele
17  * @Description
18  * @create 2021-01-24
19  */
20 public class ListenerB {
21 
22     @Subscribe
23     public void handle(String msg) {
24         System.out.println("ListenerB:" + msg);
25     }
26 
27 }
28 
29 /**
30  * @author tele
31  * @Description
32  * @create 2021-01-24
33  */
34 public class Main {
35 
36 
37     public static void main(String[] args) throws InterruptedException {
38 
39         final EventBus eventBus = new EventBus();
40         final ListenerA a = new ListenerA();
41         ListenerB b = new ListenerB();
42         CountDownLatch countDownLatch = new CountDownLatch(6);
43 
44         Runnable r1 = ()-> {
45             eventBus.register(a);
46             countDownLatch.countDown();
47         };
48         Thread t1 = new Thread(r1);
49         Thread t2 = new Thread(r1);
50         Thread t3 = new Thread(r1);
51 
52         Runnable r2 = ()-> {
53             eventBus.register(b);
54             countDownLatch.countDown();
55         };
56         Thread t4 = new Thread(r2);
57         Thread t5 = new Thread(r2);
58         Thread t6 = new Thread(r2);
59 
60         t1.start();
61         t2.start();
62         t3.start();
63         t4.start();
64         t5.start();
65         t6.start();
66         countDownLatch.await();
67         SetMultimap<Class<?>, EventSubscriber> subscribersByType = eventBus.getSubscribersByType();
68         subscribersByType.asMap().forEach((k,v)-> {
69             System.out.println("key:" + k);
70             v.forEach(System.out::println);
71         });
72     }
73 }

輸出結果如下:

 ok,沒啥問題,接下來再修改下源碼把使用讀寫鎖的兩行程式碼注釋掉,再執行下程式碼

 

 輸出結果如下:

顯然,ListenerA的註冊結果被覆蓋了,這裡簡要說下原因,subscribersByType,k-v結構簡略表示為 K-event.class ,value-Set<Listener.class>,我們知道java中的hashset不重複的特性是基於hashmap實現的.同樣的,這裡的SetMultiMap實際是用的HashMultiMap,翻翻源碼就知道了,內部存儲數據的容器是hashmap,那麼這個問題就轉換成了hashmap的執行緒安全問題了,hashmap多執行緒put hash相同的元素會產生丟失問題,多執行緒下同時put get有可能導致get 出null.了解到這我們就知道為什麼要加鎖了,使用讀寫鎖的版本一直持續到19.0,從20.0開始從開始使用並發容器代替讀寫鎖,因為對於eventbus而言始終是讀遠大於寫,基於cow機制實現的CopyOnWriteArrayList在讀寫同時進行時通過延遲更新的策略不阻塞執行緒,對於event的處理 而言是可以接受的,因為本次event在post時沒有分發到對應的subsriber,下次同類型的event觸發就ok了,事實上,這種場景極少,因為從使用經歷來看,一般是項目啟動時就註冊,分發都是需要處理邏輯時才會觸發,不阻塞與每次都需要加解讀鎖相比,顯然不阻塞的性能更好了.老版本的分發流程不再贅述,因為確實沒啥好分析的了,如果你能看懂上面分析的新版本的dispatcher,當你看老版本的時候就會感覺很簡單了

四.優勢與缺陷

1.進程內使用,無法實現跨進程處理,需要跨進程傳遞消息,還是老老實實的用消息隊列吧

2.和redis一樣基於記憶體,天然的不可靠,redis好歹還有aof和rdb,可event bus沒有任何持久化機制

3.個人對新版的Subscriber實現方式有點看法,沒必須要把執行緒池參數傳遞給Subscriber,因為Subscriber只是被執行者,16.0的版本執行緒池參數是AsyncEventBus持有

4.優勢:簡單,開箱即用

五.小結

1.只分析了註冊與分發流程,異常處理之類的沒有涉及,用法的話,網上已經很多了,不再贅述

2.event bus的程式碼很巧妙,細細品味還有很多巧妙之處,比如上面那個dispatching變數

六.參考文檔

1.github //github.com/google/guava/wiki/EventBusExplained#for-producers

 

Tags: