[源碼解析] 並行分散式任務隊列 Celery 之 EventDispatcher & Event 組件

[源碼解析] 並行分散式任務隊列 Celery 之 EventDispatcher & Event 組件

0x00 摘要

Celery是一個簡單、靈活且可靠的,處理大量事件的分散式系統,專註於實時處理的非同步任務隊列,同時也支援任務調度。

本文講解 EventDispatcher 和 Event 組件 如何實現。

0x01 思路

EventDispatcher 和 Event 組件負責 Celery 內部事件(Event)的處理。

從字面上可以知道,EventDispatcher 組件的功能是事件(Event)分發,所以我們可以有如下已知資訊:

  • 事件分發 勢必有生產者,消費者,EventDispatcher 就是作為 事件生產者;
  • 涉及到生產消費,那麼需要有一個 broker 存儲中間事件;
  • 因為 Celery 底層依賴於 Kombu,而 Kombu 本身就有生產者,消費者概念,所以這裡可以直接利用這兩個概念;
  • Kombu 也提供了 Mailbox 的實現,它的作用就是通過 Mailbox 我們可以實現不同實例之間的事件發送和處理,具體可以是單播 和 廣播;

所以我們可以大致推論:EventDispatcher 可以利用 kombu 的 producer, consumer 或者 Mailbox。

而 Events 是負責事件(Event)的接受,所以我們也可以推論:

  • Events 利用 Kombu 的消費者來處理 事件;
  • 具體如何處理事件,則會依據 Celery 的當前狀態決定,這就涉及到了 State 功能;

我們下面就看看具體是怎麼實現的。

為了讓大家更好理解,我們先給出一個邏輯圖如下:

0x02 定義

EventDispatcher 程式碼位於:celery\events\dispatcher.py

可以看到一個事件分發者需要擁有哪些成員變數以實現自己的功能:

  • connection (kombu.Connection) :就是用來和 Broker 交互的連接功能;
  • channel (kombu.Channel) : Channel 可以理解成共享一個Connection的多個輕量化連接。就是真正的連接。
    • Connection 是 AMQP 對 連接的封裝;
    • Channel 是 AMQP 對 MQ 的操作的封裝;
    • 具體以 “針對redis的輕量化連接” 來說,Channel 可以認為是 redis 操作和連接的封裝。每個 Channel 都可以與 redis 建立一個連接,在此連接之上對 redis 進行操作,每個連接都有一個 socket,每個 socket 都有一個 file,從這個 file 可以進行 poll。
  • producer :事件生產者,使用 kombu producer 概念;
  • exchange :生產者發布事件時,先將事件發送到Exchange,通過Exchange與隊列的綁定規則將事件發送到隊列。
  • hostname : 用來標示自己,這樣 EventDispatcher 的使用者可以知道並且使用;
  • groups :事件組功能;
  • _outbound_buffer :事件快取;
  • clock :Lamport 邏輯時鐘,在分散式系統中用於區分事件的發生順序的時間機制;

具體類的定義是:

class EventDispatcher:
    """Dispatches event messages.
    """

    DISABLED_TRANSPORTS = {'sql'}

    app = None

    def __init__(self, connection=None, hostname=None, enabled=True,
                 channel=None, buffer_while_offline=True, app=None,
                 serializer=None, groups=None, delivery_mode=1,
                 buffer_group=None, buffer_limit=24, on_send_buffered=None):
        self.app = app_or_default(app or self.app)
        self.connection = connection
        self.channel = channel
        self.hostname = hostname or anon_nodename()
        self.buffer_while_offline = buffer_while_offline
        self.buffer_group = buffer_group or frozenset()
        self.buffer_limit = buffer_limit
        self.on_send_buffered = on_send_buffered
        self._group_buffer = defaultdict(list)
        self.mutex = threading.Lock()
        self.producer = None
        self._outbound_buffer = deque()
        self.serializer = serializer or self.app.conf.event_serializer
        self.on_enabled = set()
        self.on_disabled = set()
        self.groups = set(groups or [])
        self.tzoffset = [-time.timezone, -time.altzone]
        self.clock = self.app.clock
        self.delivery_mode = delivery_mode
        if not connection and channel:
            self.connection = channel.connection.client
        self.enabled = enabled
        conninfo = self.connection or self.app.connection_for_write()
        self.exchange = get_exchange(conninfo,
                                     name=self.app.conf.event_exchange)
        if conninfo.transport.driver_type in self.DISABLED_TRANSPORTS:
            self.enabled = False
        if self.enabled:
            self.enable()
        self.headers = {'hostname': self.hostname}
        self.pid = os.getpid()

我們先給出此時變數內容,大家可以先有所了解。

self = {EventDispatcher} <celery.events.dispatcher.EventDispatcher object at 0x000001D37765B308>
 DISABLED_TRANSPORTS = {set: 1} {'sql'}
 app = {Celery} <Celery myTest at 0x1d375a69e88>
 buffer_group = {frozenset: 0} frozenset()
 buffer_limit = {int} 24
 buffer_while_offline = {bool} True
 channel = {NoneType} None
 clock = {LamportClock} 0
 connection = {Connection} <Connection: redis://localhost:6379// at 0x1d37765b388>
 delivery_mode = {int} 1
 enabled = {bool} True
 exchange = {Exchange} Exchange celeryev(fanout)
 groups = {set: 1} {'worker'}
 headers = {dict: 1} {'hostname': 'celery@DESKTOP-0GO3RPO'}
 hostname = {str} 'celery@DESKTOP-0GO3RPO'
 mutex = {lock} <unlocked _thread.lock object at 0x000001D377623A20>
 on_disabled = {set: 1} {<bound method Heart.stop of <celery.worker.heartbeat.Heart object at 0x000001D377636408>>}
 on_enabled = {set: 1} {<bound method Heart.start of <celery.worker.heartbeat.Heart object at 0x000001D377636408>>}
 on_send_buffered = {NoneType} None
 pid = {int} 26144
 producer = {Producer} <Producer: <promise: 0x1d37761cf78>>
 publisher = {Producer} <Producer: <promise: 0x1d37761cf78>>
 serializer = {str} 'json'
 tzoffset = {list: 2} [28800, 32400]
  _group_buffer = {defaultdict: 0} defaultdict(<class 'list'>, {})
  _outbound_buffer = {deque: 0} deque([])

0x03 Producer

我們發現,EventDispatcher 確實使用了 Kombu 的 Producer,當然 Celery 這裡使用 ampq 對 Kombu 做了封裝。所以我們重點就需要看如何配置 Producer。

具體需要配置的是:

  • Connection,需要以此來知道聯繫哪一個 Redis;

  • Exchange,需要知道讀取哪一個 Queue;

下面我們就逐一分析。

3.1 Connection

由程式碼可以看到,Connection 是直接使用 Celery 的 connection_for_write

conninfo = self.connection or self.app.connection_for_write()

此時變數為:

connection = {Connection} <Connection: redis://localhost:6379// at 0x1be931de148>
conninfo = {Connection} <Connection: redis://localhost:6379// at 0x1be931de148>

3.2 Exchange

Exchange 概念如下:

  • Exchange:交換機 或者 路由。事件發送者將事件發至Exchange,Exchange負責將事件分發至隊列;
  • Queue:事件隊列,存儲著即將被應用消費掉的事件,Exchange負責將事件分發Queue,消費者從Queue接收事件;

具體來說,Exchange 用於路由事件(事件發給exchange,exchange發給對應的queue)。

交換機通過匹配事件的 routing_key 和 binding_key來轉發事件,binding_key 是consumer 聲明隊列時與交換機的綁定關係。

路由就是比較routing-key(這個 message 提供)和 binding-key(這個queue 註冊到 exchange 的時候提供)。

使用時,需要指定exchange的名稱和類型(direct,topic和fanout)。可以發現,和RabbitMQ中的exchange概念是一樣的。事件發送給exchages。交換機可以被命名,可以通過路由演算法進行配置。

具體回到程式碼上。

def get_exchange(conn, name=EVENT_EXCHANGE_NAME):
    """Get exchange used for sending events.

    Arguments:
        conn (kombu.Connection): Connection used for sending/receiving events.
        name (str): Name of the exchange. Default is ``celeryev``.

    Note:
        The event type changes if Redis is used as the transport
        (from topic -> fanout).
    """
    ex = copy(event_exchange)
    if conn.transport.driver_type == 'redis':
        # quick hack for Issue #436
        ex.type = 'fanout'
    if name != ex.name:
        ex.name = name
    return ex

此時變數為:

EVENT_EXCHANGE_NAME = 'celeryev'
    
self.exchange = {Exchange} Exchange celeryev(fanout)

所以我們知道,這裡默認的 Exchange 就是一個 celeryev(fanout) 類型。

3.3 建立

於是,我們具體就看到了 Producer。

    def enable(self):
        self.producer = Producer(self.channel or self.connection,
                                 exchange=self.exchange,
                                 serializer=self.serializer,
                                 auto_declare=False)
        self.enabled = True
        for callback in self.on_enabled:
            callback()

0x04 分發事件

既然建立了 Producer,我們就可以進行發送。

4.1 Send 發送

發送事件就是直接是否需要成組發送。

  • 如果需要分組發送,就內部有一個快取,然後成組發送;
  • 否則就直接調用 Producer publish API 發送。

關於如何區分分組是依靠如下程式碼:

groups, group = self.groups, group_from(type)

相關變數為:

group = {str} 'worker'
groups = {set: 1} {'worker'}
type = {str} 'worker-online'

發送具體程式碼如下:

    def send(self, type, blind=False, utcoffset=utcoffset, retry=False,
             retry_policy=None, Event=Event, **fields):
        """Send event.
        """
        if self.enabled:
            groups, group = self.groups, group_from(type)
            if groups and group not in groups:
                return
            if group in self.buffer_group:
                clock = self.clock.forward()
                event = Event(type, hostname=self.hostname,
                              utcoffset=utcoffset(),
                              pid=self.pid, clock=clock, **fields)
                buf = self._group_buffer[group]
                buf.append(event)
                if len(buf) >= self.buffer_limit:
                    self.flush()
                elif self.on_send_buffered:
                    self.on_send_buffered()
            else:
                return self.publish(type, fields, self.producer, blind=blind,
                                    Event=Event, retry=retry,
                                    retry_policy=retry_policy)

4.2 publish 與 broker 交互

send 會調用到這裡。

這裡構建了 routing_key :

routing_key=type.replace('-', '.')

於是得倒了routing_key 為 ‘worker.online’。

也構建了 Event;

event = {dict: 13} 
 'hostname' = {str} 'celery@DESKTOP-0GO3RPO'
 'utcoffset' = {int} -8
 'pid' = {int} 24320
 'clock' = {int} 1
 'freq' = {float} 2.0
 'active' = {int} 0
 'processed' = {int} 0
 'loadavg' = {tuple: 3} (0.0, 0.0, 0.0)
 'sw_ident' = {str} 'py-celery'
 'sw_ver' = {str} '5.0.5'
 'sw_sys' = {str} 'Windows'
 'timestamp' = {float} 1611464767.3456059
 'type' = {str} 'worker-online'
 __len__ = {int} 13

publish 程式碼如下:

    def publish(self, type, fields, producer,
                blind=False, Event=Event, **kwargs):
        """Publish event using custom :class:`~kombu.Producer`.

        Arguments:
            type (str): Event type name, with group separated by dash (`-`).
                fields: Dictionary of event fields, must be json serializable.
            producer (kombu.Producer): Producer instance to use:
                only the ``publish`` method will be called.
            retry (bool): Retry in the event of connection failure.
            retry_policy (Mapping): Map of custom retry policy options.
                See :meth:`~kombu.Connection.ensure`.
            blind (bool): Don't set logical clock value (also don't forward
                the internal logical clock).
            Event (Callable): Event type used to create event.
                Defaults to :func:`Event`.
            utcoffset (Callable): Function returning the current
                utc offset in hours.
        """
        clock = None if blind else self.clock.forward()
        event = Event(type, hostname=self.hostname, utcoffset=utcoffset(),
                      pid=self.pid, clock=clock, **fields)
        with self.mutex:
            return self._publish(event, producer,
                                 routing_key=type.replace('-', '.'), **kwargs)

    def _publish(self, event, producer, routing_key, retry=False,
                 retry_policy=None, utcoffset=utcoffset):
        exchange = self.exchange
        try:
            producer.publish(
                event,
                routing_key=routing_key,
                exchange=exchange.name,
                retry=retry,
                retry_policy=retry_policy,
                declare=[exchange],
                serializer=self.serializer,
                headers=self.headers,
                delivery_mode=self.delivery_mode,
            )
        except Exception as exc:  # pylint: disable=broad-except
            if not self.buffer_while_offline:
                raise
            self._outbound_buffer.append((event, routing_key, exc))

因為是 pubsub,所以此時在 redis 之中看不到事件內容。

此時redis內容如下(看不到事件):

redis-cli.exe -p 6379
127.0.0.1:6379> keys *
1) "_kombu.binding.celery.pidbox"
2) "_kombu.binding.celery"
3) "_kombu.binding.celeryev"
127.0.0.1:6379> smembers _kombu.binding.celeryev
 1) "worker.#\x06\x16\x06\x16celeryev.64089900-d397-4564-b343-742664c1b214"
127.0.0.1:6379> smembers _kombu.binding.celery
1) "celery\x06\x16\x06\x16celery"
127.0.0.1:6379> smembers _kombu.binding.celery.pidbox
1) "\x06\x16\x06\[email protected]"
127.0.0.1:6379>

現在,EventDispatcher 組件已經把事件發送出去。

這個事件將如何處理?我們需要看看 Events 組件

0x05 Events 組件

5.1 Event 有什麼用

前面說了,Celery 在 Task/Worker 的狀態發生變化的時候就會發出 Event,所以,一個很明顯的應用就是監控 Event 的狀態,例如 Celery 大家所熟知的基於 WebUI 的管理工具 flower 就用到了 Event,但是,這也是一個比較明顯的應用,除此之外,我們還可以利用 Event 來給 Task 做快照,甚至實時對 Task 的狀態轉變做出響應,例如任務失敗之後觸發報警,任務成功之後執行被依賴的任務等等,總結一下,其實就是:

  • 對 Task 的狀態做快照;
  • 對 Task 的狀態做實時處理;
  • 監控 Celery(Worker/Task) 的執行狀態;

5.2 調試

Celery Events 可以用來開啟快照相機,或者將事件dump到標準輸出。

比如:

celery -A proj events -c myapp.DumpCam --frequency=2.0

celery -A proj events --camera=<camera-class> --frequency=1.0

celery -A proj events --dump 

為了調試,我們需要採用如下方式:

app.start(argv=['events'])

具體命令實現是:

def events(ctx, dump, camera, detach, frequency, maxrate, loglevel, **kwargs):
    """Event-stream utilities."""
    app = ctx.obj.app
    if dump:
        return _run_evdump(app)

    if camera:
        return _run_evcam(camera, app=app, freq=frequency, maxrate=maxrate,
                          loglevel=loglevel,
                          detach=detach,
                          **kwargs)

    return _run_evtop(app)

5.3 入口

Events入口為:

def _run_evtop(app):
    try:
        from celery.events.cursesmon import evtop
        _set_process_status('top')
        return evtop(app=app)

接著跟蹤看看。

def evtop(app=None):  # pragma: no cover
    """Start curses monitor."""
    app = app_or_default(app)
    state = app.events.State()
    display = CursesMonitor(state, app)
    display.init_screen()
    refresher = DisplayThread(display)
    refresher.start()
   
    capture_events(app, state, display)

5.4 事件循環

我們來到了事件循環。

這裡建立了一個 app.events.Receiver。

注意,這裡給 Receiver 傳入的 handlers={‘*’: state.event},是後續處理事件時候的處理函數。

def capture_events(app, state, display):  # pragma: no cover

    while 1:
        with app.connection_for_read() as conn:
            try:
                conn.ensure_connection(on_connection_error,
                                       app.conf.broker_connection_max_retries)
                
                recv = app.events.Receiver(conn, handlers={'*': state.event})
                
                display.resetscreen()
                display.init_screen()
                
                recv.capture()
                
            except conn.connection_errors + conn.channel_errors as exc:
                print(f'Connection lost: {exc!r}', file=sys.stderr)

結果發現是循環調用 recv.capture()。

具體如下:

Events


   +--------------------+
   |      loop          |
   |                    |
   |                    |
   |                    |
   |                    |
   |                    v
   |
   |        EventReceiver.capture()
   |
   |                    +
   |                    |
   |                    |
   |                    |
   |                    |
   |                    |
   |                    |
   +--------------------+

5.5 EventReceiver

EventReceiver 就是用來接收Event,並且處理的。而且需要留意,EventReceiver 是繼承 ConsumerMixin。

class EventReceiver(ConsumerMixin):
    """Capture events.

    Arguments:
        connection (kombu.Connection): Connection to the broker.
        handlers (Mapping[Callable]): Event handlers.
            This is  a map of event type names and their handlers.
            The special handler `"*"` captures all events that don't have a
            handler.
    """

其程式碼如下:

    def capture(self, limit=None, timeout=None, wakeup=True):
        """Open up a consumer capturing events.

        This has to run in the main process, and it will never stop
        unless :attr:`EventDispatcher.should_stop` is set to True, or
        forced via :exc:`KeyboardInterrupt` or :exc:`SystemExit`.
        """
        for _ in self.consume(limit=limit, timeout=timeout, wakeup=wakeup):
            pass

對應變數如下:

self.consume = {method} <bound method ConsumerMixin.consume of <celery.events.receiver.EventReceiver object at 0x000001CA8C22AB08>>

self = {EventReceiver} <celery.events.receiver.EventReceiver object at 0x000001CA8C22AB08>

可以看到利用了 ConsumerMixin 來處理事件。其實從文章開始時候我們就知道,既然有 kombu . producer ,就必然有 kombu . consumer。

這裡其實是有多個 EventReceiver 綁定了這個 Connection,然後 ConsumerMixin 幫助協調這些 Receiver,每個 Receiver 都可以收到這些 Event,但是能不能處理就看他們的 routing_key 設置得好不好了

所以如下:

Events


   +--------------------+
   |      loop          |
   |                    |
   |                    |
   |                    |
   |                    |
   |                    v
   |
   |     EventReceiver(ConsumerMixin).capture()
   |
   |                    +
   |                    |
   |                    |
   |                    |
   |                    |
   |                    |
   |                    |
   +--------------------+

5.6 ConsumerMixin

ConsumerMixin 是 Kombu 提供的 組合模式類,可以用來方便的實現 Consumer Programs。

class ConsumerMixin:
    """Convenience mixin for implementing consumer programs.

    It can be used outside of threads, with threads, or greenthreads
    (eventlet/gevent) too.

    The basic class would need a :attr:`connection` attribute
    which must be a :class:`~kombu.Connection` instance,
    and define a :meth:`get_consumers` method that returns a list
    of :class:`kombu.Consumer` instances to use.
    Supporting multiple consumers is important so that multiple
    channels can be used for different QoS requirements.
	"""

文件在 :kombu\mixins.py

    def consume(self, limit=None, timeout=None, safety_interval=1, **kwargs):
        elapsed = 0
        with self.consumer_context(**kwargs) as (conn, channel, consumers):
            for i in limit and range(limit) or count():
                if self.should_stop:
                    break
                self.on_iteration()
                try:
                    conn.drain_events(timeout=safety_interval)
                except socket.timeout:
                    conn.heartbeat_check()
                    elapsed += safety_interval
                    if timeout and elapsed >= timeout:
                        raise
                except OSError:
                    if not self.should_stop:
                        raise
                else:
                    yield
                    elapsed = 0

5.6.1 Consumer

ConsumerMixin 內部建立 Consumer如下:

    @contextmanager
    def Consumer(self):
        with self.establish_connection() as conn:
            self.on_connection_revived()

            channel = conn.default_channel
            cls = partial(Consumer, channel,
                          on_decode_error=self.on_decode_error)
            with self._consume_from(*self.get_consumers(cls, channel)) as c:
                yield conn, channel, c

            self.on_consume_end(conn, channel)

在 具體建立時候,把self._receive設置為 Consumer callback。

    def get_consumers(self, Consumer, channel):
        return [Consumer(queues=[self.queue],
                         callbacks=[self._receive], no_ack=True,
                         accept=self.accept)]

堆棧為:

get_consumers, receiver.py:72
Consumer, mixins.py:230
__enter__, contextlib.py:112
consumer_context, mixins.py:181
__enter__, contextlib.py:112
consume, mixins.py:188
capture, receiver.py:91
evdump, dumper.py:95
_run_evdump, events.py:21
events, events.py:87
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
<module>, myEvent.py:18

此時變數為:

self.consume = {method} <bound method ConsumerMixin.consume of <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>>
self.queue = {Queue} <unbound Queue celeryev.6e24485e-9f27-46e1-90c9-6b52f44b9902 -> <unbound Exchange celeryev(fanout)> -> #>
self._receive = {method} <bound method EventReceiver._receive of <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>>
Consumer = {partial} functools.partial(<class 'kombu.messaging.Consumer'>, <kombu.transport.redis.Channel object at 0x000001FE1080CC08>, on_decode_error=<bound method ConsumerMixin.on_decode_error of <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>>)
channel = {Channel} <kombu.transport.redis.Channel object at 0x000001FE1080CC08>
self = {EventReceiver} <celery.events.receiver.EventReceiver object at 0x000001FE106E06C8>

此時為:

 Events


+-----------------------------------------+
| EventReceiver(ConsumerMixin)            |
|                                         |
|                                         |
|                                         |  consume
|                                         |               +------------------+
|                            capture  +-----------------> | Consumer         |
|                                         |               |                  |
|                                         |               |                  |
|                                         |               |                  |
|                           _receive  <----------------------+ callbacks     |
|                                         |               |                  |
|                                         |               |                  |
|                                         |               +------------------+
+-----------------------------------------+

5.7 接收

當有事件時候,就調用 _receive 進行接收。

    def _receive(self, body, message, list=list, isinstance=isinstance):
        if isinstance(body, list):  # celery 4.0+: List of events
            process, from_message = self.process, self.event_from_message
            [process(*from_message(event)) for event in body]
        else:
            self.process(*self.event_from_message(body))

5.8 處理

接受之後,就可以進行處理。

    def process(self, type, event):
        """Process event by dispatching to configured handler."""
        handler = self.handlers.get(type) or self.handlers.get('*')
        handler and handler(event)

此時如下:

這裡的 Receiver . handlers 是建立 Receiver時候 傳入的 handlers={‘*’: state.event},是後續處理事件時候的處理函數。

Events


+-----------------------------------------+
| EventReceiver(ConsumerMixin)            |
|                                         |
|                                         |
|                                         |  consume
|                                         |               +------------------+
|                            capture  +-----------------> | Consumer         |
|                                         |               |                  |
|                                         |               |                  |
|                                         |               |                  |
|                           _receive  <----------------------+ callbacks     |
|                                         |               |                  |
|                                         |               |                  |
|                                         |               +------------------+
|                                         |
|                            handlers +------------+
|                                         |        |      +------------------+
+-----------------------------------------+        |      |state             |
                                                   |      |                  |
                                                   |      |                  |
                                                   +-------->event           |
                                                          |                  |
                                                          |                  |
                                                          +------------------+

5.9 state處理函數

具體如下:

    @cached_property
    def _event(self):
        return self._create_dispatcher()

概括起來是這樣的:

  1. 先找 group 的 handler,有的話就用這個了,否則看下面;這個默認是沒東西的,所以可以先pass
  2. 如果是 worker 的 Event,就執行 worker 對應的處理
  3. 如果是 task 的 Event,就執行 task 的對應處理
    def _create_dispatcher(self):
        # noqa: C901
        # pylint: disable=too-many-statements
        # This code is highly optimized, but not for reusability.
        get_handler = self.handlers.__getitem__
        event_callback = self.event_callback
        wfields = itemgetter('hostname', 'timestamp', 'local_received')
        tfields = itemgetter('uuid', 'hostname', 'timestamp',
                             'local_received', 'clock')
        taskheap = self._taskheap
        th_append = taskheap.append
        th_pop = taskheap.pop
        # Removing events from task heap is an O(n) operation,
        # so easier to just account for the common number of events
        # for each task (PENDING->RECEIVED->STARTED->final)
        #: an O(n) operation
        max_events_in_heap = self.max_tasks_in_memory * self.heap_multiplier
        add_type = self._seen_types.add
        on_node_join, on_node_leave = self.on_node_join, self.on_node_leave
        tasks, Task = self.tasks, self.Task
        workers, Worker = self.workers, self.Worker
        # avoid updating LRU entry at getitem
        get_worker, get_task = workers.data.__getitem__, tasks.data.__getitem__

        get_task_by_type_set = self.tasks_by_type.__getitem__
        get_task_by_worker_set = self.tasks_by_worker.__getitem__

        def _event(event,
                   timetuple=timetuple, KeyError=KeyError,
                   insort=bisect.insort, created=True):
            self.event_count += 1
            if event_callback:
                event_callback(self, event)
            group, _, subject = event['type'].partition('-')
            try:
                handler = get_handler(group)
            except KeyError:
                pass
            else:
                return handler(subject, event), subject

            if group == 'worker':
                try:
                    hostname, timestamp, local_received = wfields(event)
                except KeyError:
                    pass
                else:
                    is_offline = subject == 'offline'
                    try:
                        worker, created = get_worker(hostname), False
                    except KeyError:
                        if is_offline:
                            worker, created = Worker(hostname), False
                        else:
                            worker = workers[hostname] = Worker(hostname)
                    worker.event(subject, timestamp, local_received, event)
                    if on_node_join and (created or subject == 'online'):
                        on_node_join(worker)
                    if on_node_leave and is_offline:
                        on_node_leave(worker)
                        workers.pop(hostname, None)
                    return (worker, created), subject
            elif group == 'task':
                (uuid, hostname, timestamp,
                 local_received, clock) = tfields(event)
                # task-sent event is sent by client, not worker
                is_client_event = subject == 'sent'
                try:
                    task, task_created = get_task(uuid), False
                except KeyError:
                    task = tasks[uuid] = Task(uuid, cluster_state=self)
                    task_created = True
                if is_client_event:
                    task.client = hostname
                else:
                    try:
                        worker = get_worker(hostname)
                    except KeyError:
                        worker = workers[hostname] = Worker(hostname)
                    task.worker = worker
                    if worker is not None and local_received:
                        worker.event(None, local_received, timestamp)

                origin = hostname if is_client_event else worker.id

                # remove oldest event if exceeding the limit.
                heaps = len(taskheap)
                if heaps + 1 > max_events_in_heap:
                    th_pop(0)

                # most events will be dated later than the previous.
                timetup = timetuple(clock, timestamp, origin, ref(task))
                if heaps and timetup > taskheap[-1]:
                    th_append(timetup)
                else:
                    insort(taskheap, timetup)

                if subject == 'received':
                    self.task_count += 1
                task.event(subject, timestamp, local_received, event)
                task_name = task.name
                if task_name is not None:
                    add_type(task_name)
                    if task_created:  # add to tasks_by_type index
                        get_task_by_type_set(task_name).add(task)
                        get_task_by_worker_set(hostname).add(task)
                if task.parent_id:
                    try:
                        parent_task = self.tasks[task.parent_id]
                    except KeyError:
                        self._add_pending_task_child(task)
                    else:
                        parent_task.children.add(task)
                try:
                    _children = self._tasks_to_resolve.pop(uuid)
                except KeyError:
                    pass
                else:
                    task.children.update(_children)

                return (task, task_created), subject
        return _event

具體如下:

 Events


+-----------------------------+
| EventReceiver(ConsumerMixin |
|                             |
|                             |               +------------------+
|                             |  consume      | Consumer         |
|                             |               |                  |
|                capture  +-----------------> |                  |
|                             |               |                  |
|                             |               |                  |
|                             |               |                  |
|               _receive  <----------------------+ callbacks     |
|                             |               |                  |
|                             |               |                  |
|                             |               +------------------+
|                             |
|                handlers +------------+
|                             |        |      +------------------------+
+-----------------------------+        |      |state                   |
                                       |      |                        |
                                       |      |                        |
                                       +---------> event +---+         |
                                              |              |         |
                                              |              |         |
                                              |              v         |
                                              |     _create_dispatcher |
                                              |              +         |
                                              |              |         |
                                              |              |         |
                                              |              |         |
                                              +------------------------+
                                                             |
                                                             |
                                                    +--------+------+
                                group == 'worker'   |               | group == 'task'
                                                    |               |
                                                    v               v
                                          worker.event          task.event

最終,邏輯如下:

                     Producer Scope   +         Broker      +   Consumer Scope
                                      |                     |
+-----------------------------+       |     Redis pubsub    |     Events
| EventDispatcher             |       |                     |
|                             |       |                     |     +-----------------------------+
|                             |       |                     |     | EventReceiver(ConsumerMixin |
|                             |       |                     |     |                             |
|        connection           |       |                     |     |                             |               +------------------+
|                             |       |                     |     |                             |  consume      | Consumer         |
|        channel              |       |                     |     |                             |               |                  |
|                             |       |                     |     |                capture  +-----------------> |                  |
|        producer  +----------------------->  Event +-----------> |                             |               |                  |
|                             |       |                     |     |                             |               |                  |
|        exchange             |       |                     |     |                             |               |                  |
|                             |       |                     |     |               _receive  <----------------------+ callbacks     |
|        hostname             |       |                     |     |                             |               |                  |
|                             |       |                     |     |                             |               |                  |
|        groups               |       |                     |     |                             |               +------------------+
|                             |       |                     |     |                             |
|        _outbound_buffer     |       |                     |     |                handlers +------------+
|                             |       |                     |     |                             |        |      +------------------------+
|        clock                |       |                     |     +-----------------------------+        |      |state                   |
|                             |       |                     |                                            |      |                        |
+-----------------------------+       |                     |                                            |      |                        |
                                      |                     |                                            +---------> event +---+         |
                                      |                     |                                                   |              |         |
                                      |                     |                                                   |              |         |
                                      |                     |                                                   |              v         |
                                      |                     |                                                   |     _create_dispatcher |
                                      |                     |                                                   |              +         |
                                      |                     |                                                   |              |         |
                                      |                     |                                                   |              |         |
                                      |                     |                                                   |              |         |
                                      |                     |                                                   +------------------------+
                                      |                     |                                                                  |
                                      |                     |                                                                  |
                                      |                     |                                                         +--------+------+
                                      |                     |                                     group == 'worker'   |               | group == 'task'
                                      |                     |                                                         |               |
                                      |                     |                                                         v               v
                                      +                     +                                               worker.event          task.event

手機如下:

至此,Celery 內部的事件發送,接受處理 的兩個組件就講解完畢。

0xEE 個人資訊

★★★★★★關於生活和技術的思考★★★★★★

微信公眾帳號:羅西的思考

如果您想及時得到個人撰寫文章的消息推送,或者想看看個人推薦的技術資料,可以掃描下面二維碼(或者長按識別二維碼)關注個人公眾號)。

在這裡插入圖片描述

0xFF 參考

6: Events 的實現

Celery用戶指引------監控與管理