[源码解析] 分布式任务队列 Celery 之启动 Consumer
[源码解析] 分布式任务队列 Celery 之启动 Consumer
0x00 摘要
Celery是一个简单、灵活且可靠的,处理大量消息的分布式系统,专注于实时处理的异步任务队列,同时也支持任务调度。本文我们来说说Celery Worker Consumer 组件的启动。
首先会概述启动过程,然后会选择Consumer的三个有特点的 子组件(分别是配置网络,启动 tasks,开启对task的消费)进一步讲解其启动过程。这样大家就对 Consumer 的来龙去脉了解会更加深入。
0x01 综述
Celery Worker是执行任务的消费者,通常会在多台服务器运行多个消费者来提高执行效率。那么在Woker之中,如何从broker取得消息。这就需要一个consumer。
我们首先给出一个 consumer 的图例,让大家有个初步印象。
我们已经知道,Kombu实现了Producer与Consumer两个概念。因此我们可以推论,在Celery的实现中,必然使用到Kombu的 Producer与 Consumer。
1.1 kombu.consumer
我们回忆下 kombu . consumer
的功能:
Kombu . Consumer
以及其相关类的作用主要如下:
- Exchange:MQ 路由,消息发送者将消息发至Exchange,Exchange负责将消息分发至队列。
- Queue:对应的队列抽象,存储着即将被应用消费掉的消息,Exchange负责将消息分发Queue,消费者从Queue接收消息;
- Consumers 是接受消息的抽象类,consumer需要声明一个queue,并将queue与指定的exchange绑定,然后从queue里面接收消息。就是说,从用户角度,知道了一个 exchange,就可以从中读取消息,具体这个消息就是从 queue 中读取的。
在具体 Consumer 的实现中,它把 queue 与 channel 联系起来。queue 里面有一个 channel,用来访问redis,queue 里面 也有 Exchange,Exchange 知道访问具体 redis 哪个key(就是queue对应的那个key)。
Consumer 消费消息是通过 Queue 来消费,然后 Queue 又转嫁给 Channel。
就是说,在 exchange,queue都配置好的情况下,channel 就知道 redis 的哪个 key 对应了 哪个 consumer。如果 connection 里面有消息,就会调用到 consumer 的回调函数。
1.2 Celery Consumer
注意的是:celery Consumer 组件不是Kombu的Consumer,而是利用了Kombu的Consumer从broker取得消息。
celery Consumer 组件的概念远远要大于Kombu的Consumer,不只是从broker取得消息,也包括消息的消费,分发,监控,心跳等一系列功能。
可以说,除了消息循环引擎 被 hub 承担,多进程被 Pool,Autoscaler 承担,定时任务被 timer,beat 承担之外,其他主要功能都被 Consumer 承担。
0x02 start in worker
我们还得再回顾下前文,当Worker初始化完成之后,worker继续调用start。
worker.start()
其代码如下:
def start(self):
try:
self.blueprint.start(self)
......
因此调用到blueprint.start(blueprint负责决定各个子模块的执行顺序)。因为Consumer是worker的组件之一,从而这里调用到Consumer的start。
堆栈如下:
start, consumer.py:300
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
0x03 start in consumer
现在我们来到了Consumer 相关部分。
代码位于:celery/worker/consumer/consumer.py
我们知道,Consumer内部也是有自己的Steps,具体如下:
class Consumer:
"""Consumer blueprint."""
Strategies = dict
#: Optional callback called the first time the worker
#: is ready to receive tasks.
init_callback = None
#: The current worker pool instance.
pool = None
#: A timer used for high-priority internal tasks, such
#: as sending heartbeats.
timer = None
class Blueprint(bootsteps.Blueprint):
"""Consumer blueprint."""
name = 'Consumer'
default_steps = [
'celery.worker.consumer.connection:Connection',
'celery.worker.consumer.mingle:Mingle',
'celery.worker.consumer.events:Events',
'celery.worker.consumer.gossip:Gossip',
'celery.worker.consumer.heart:Heart',
'celery.worker.consumer.control:Control',
'celery.worker.consumer.tasks:Tasks',
'celery.worker.consumer.consumer:Evloop',
'celery.worker.consumer.agent:Agent',
]
因此,Woker调用Consumer启动时,就调用到Consumer的start。
具体如下:
def start(self):
blueprint = self.blueprint
while blueprint.state not in STOP_CONDITIONS:
maybe_shutdown()
if self.restart_count:
try:
self._restart_state.step()
except RestartFreqExceeded as exc:
crit('Frequent restarts detected: %r', exc, exc_info=1)
sleep(1)
self.restart_count += 1
try:
blueprint.start(self) # 这里是关键
except self.connection_errors as exc:
如下代码调用到blueprint.start。
blueprint.start(self)
3.1 start consumer.blueprint
代码位于:celery-master/celery/bootsteps.py
def start(self, parent):
self.state = RUN
if self.on_start:
self.on_start()
for i, step in enumerate(s for s in parent.steps if s is not None):
self.started = i + 1
step.start(parent)
因此遍历每一个step,进行start。
我们现在大致说说这些 step 的作用。
- 【1】Connection:管理和 broker 的 Connection 连接
- 【3】Events:用于发送监控事件
- 【2】Agent:
cell
actor - 【2】Mingle:不同 worker 之间同步状态用的
- 【1】Tasks:启动消息 Consumer
- 【3】Gossip:消费来自其他 worker 的事件
- 【1】Heart:发送心跳事件(consumer 的心跳)
- 【3】Control:远程命令管理服务
在参考文章 1: Worker 启动流程概述中提到:
这里我对所有的 Bootstep 都做了标号处理,标号的大小说明了这些服务对于我们代码阅读的重要程序,1 最重要,3 最不紧要。对于 Consumer 来说,
1 是基本功能,这些功能组成了一个简单的非强壮的消息队列框架;
2 一般重要,可以实现一个高级一点的功能;
3 属于附加功能,同时也属于一点分布式的功能。
因为每一个step对应的组件其实都很复杂,所以在后续文章中,我们会详细讲解。本文只是大致说明几个最重要的step,基本就是消息循环组件。比如 读取broker需要的connection组件,处理消息需要的task组件。
3.2 Connection step 子组件
此子组件主要是处理网络交互。
很奇怪的是,Connection 这里没有自己的逻辑,把事情都交给 Consumer
类做了。
start 的参数 c 是consumer。所以start方法调用了consumer的connect方法,也作用在 consumer 的成员变量connection。
所以此时就建立了连接。即最终会创建celery.app.amqp.Connection
实例,这里实际上是使用kombu
库的Connection
与队列连接。连接建立之后,会将Connection
注册进kombu
库的Transport
的事件循环中。
这样,Consumer 就同 broker 联系了起来。
class Connection(bootsteps.StartStopStep):
"""Service managing the consumer broker connection."""
def start(self, c):
c.connection = c.connect()
3.2.1 connect in consumer
代码在:celery/worker/consumer/consumer.py。
可以看出做了如下:
- 使用心跳为参数,创建
celery.app.amqp.Connection
实例,即得到kombu的Connection,若没有连接上,则建立连接。 - 把得到的Connection配置为异步调用。
- 返回得到的Connection。
代码如下:
def connect(self):
"""Establish the broker connection used for consuming tasks.
"""
conn = self.connection_for_read(heartbeat=self.amqheartbeat) # 心跳
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub)# 使用异步调用
return conn # 返回conn
def connection_for_read(self, heartbeat=None):
return self.ensure_connected(
self.app.connection_for_read(heartbeat=heartbeat))# 确保连接
3.2.2 connect in celery
本小节的目的就是得到 Connection。
上面app就为celery,所以现在我们来到了Celery应用。
代码在celery/app/base.py
def connection_for_read(self, url=None, **kwargs):
"""Establish connection used for consuming.
"""
return self._connection(url or self.conf.broker_read_url, **kwargs)
进而来到
def _connection(self, url, userid=None, password=None,
virtual_host=None, port=None, ssl=None,
connect_timeout=None, transport=None,
transport_options=None, heartbeat=None,
login_method=None, failover_strategy=None, **kwargs):
conf = self.conf
return self.amqp.Connection(
url,
userid or conf.broker_user,
password or conf.broker_password,
virtual_host or conf.broker_vhost,
port or conf.broker_port,
transport=transport or conf.broker_transport,
ssl=self.either('broker_use_ssl', ssl),
heartbeat=heartbeat,
login_method=login_method or conf.broker_login_method,
failover_strategy=(
failover_strategy or conf.broker_failover_strategy
),
transport_options=dict(
conf.broker_transport_options, **transport_options or {}
),
connect_timeout=self.either(
'broker_connection_timeout', connect_timeout
),
)
可以看到,最终无论Celery应用的Connection或者是Consumer看到的Connection,都是amqp.Connection,最终就是’kombu.connection.Connection’。
这里self.amqp变量如下,可以看到都是 kombu相关。
self.amqp = {AMQP} <celery.app.amqp.AMQP object at 0x7ffd556db7f0>
BrokerConnection = {type} <class 'kombu.connection.Connection'>
Connection = {type} <class 'kombu.connection.Connection'>
Consumer = {type} <class 'kombu.messaging.Consumer'>
Producer = {type} <class 'kombu.messaging.Producer'>
app = {Celery} <Celery tasks at 0x7ffd557f3da0>
argsrepr_maxsize = {int} 1024
autoexchange = {NoneType} None
default_exchange = {Exchange} Exchange celery(direct)
default_queue = {Queue} <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>
kwargsrepr_maxsize = {int} 1024
producer_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7ffd56788748>
publisher_pool = {ProducerPool} <kombu.pools.ProducerPool object at 0x7ffd56788748>
queues = {Queues: 1} {'celery': <unbound Queue celery -> <unbound Exchange celery(direct)> -> celery>}
queues_cls = {type} <class 'celery.app.amqp.Queues'>
router = {Router} <celery.app.routes.Router object at 0x7ffd56799898>
routes = {tuple: 0} ()
task_protocols = {dict: 2} {1: <bound method AMQP.as_task_v1 of <celery.app.amqp.AMQP object at 0x7ffd556db7f0>>, 2: <bound method AMQP.as_task_v2 of <celery.app.amqp.AMQP object at 0x7ffd556db7f0>>}
utc = {bool} True
我们得到一个’kombu.connection.Connection’
<Connection: redis://localhost:6379// at 0x7ffd567827b8>
然后会连接。
def ensure_connected(self, conn):
# Callback called for each retry while the connection
# can't be established.
def _error_handler(exc, interval, next_step=CONNECTION_RETRY_STEP):
if getattr(conn, 'alt', None) and interval == 0:
next_step = CONNECTION_FAILOVER
next_step = next_step.format(
when=humanize_seconds(interval, 'in', ' '),
retries=int(interval / 2),
max_retries=self.app.conf.broker_connection_max_retries)
error(CONNECTION_ERROR, conn.as_uri(), exc, next_step)
# remember that the connection is lazy, it won't establish
# until needed.
if not self.app.conf.broker_connection_retry:
# retry disabled, just call connect directly.
conn.connect()
return conn
conn = conn.ensure_connection(
_error_handler, self.app.conf.broker_connection_max_retries,
callback=maybe_shutdown,
)
return conn
堆栈如下:
ensure_connected, consumer.py:414
connection_for_read, consumer.py:405
connect, consumer.py:398
start, connection.py:21
start, bootsteps.py:116
start, consumer.py:311
start, bootsteps.py:365
start, bootsteps.py:116
start, worker.py:204
worker, worker.py:327
caller, base.py:132
new_func, decorators.py:21
invoke, core.py:610
invoke, core.py:1066
invoke, core.py:1259
main, core.py:782
start, base.py:358
worker_main, base.py:374
<module>, myTest.py:26
最终得到一个有效连接。
For Read
+------------------------------------------+ +------------------------------------------------------------------------+
+--------+ | [Consumer] Connection+-------> |Connection: redis://localhost:6379// class 'kombu.connection.Connection'|
| Gossip +<-----+ | +------------------------------------------------------------------------+
+--------+ | | +----------+
| +--------> | | ^
| | | Events | |
+-------+ | | | | |
| Tasks | <-----+ Timer Strategies Blueprint | +----------+ |
+-------+ | | |
| | |
+-------+ | pool hub loop app | +-------+ |
| Heart | <-----+ + + + + +--------> | Agent | |
+-------+ | | | | | | +-------+ |
| | | | | | +---------+ |
+------------------------------------------+--------> | Mingle | |
| | | | +---------+ |
| | | | |
v v v v |
|
+-----------------+ +-----+ +----------------+ +---------------+ +--------------+ |
| prefork.TaskPool| | Hub | | loops.asynloop | | Celery | | AMQP | |
+-----------------+ +-----+ +----------------+ | | | | |
| amqp +-----------> | Connection+------+
+---------------+ +--------------+
手机如下:
3.2.3 使用异步调用
下面代码使用kombu
库的Connection
与队列连接。连接建立之后,会将Connection
注册进kombu
库的Transport
的事件循环中。
if self.hub:
conn.transport.register_with_event_loop(conn.connection, self.hub) # 使用异步调用
所以最终如下:
For Read
+------------------------------------------+ +------------------------------------------------------------------------+
+--------+ | [Consumer] Connection+-------> |Connection: redis://localhost:6379// class 'kombu.connection.Connection'|
| Gossip +<-----+ | +------------------------------------------------------------------------+
+--------+ | | +----------+
| +--------> | | ^ ^
| | | Events | | |
+-------+ | | | | | |
| Tasks | <-----+ Timer Strategies Blueprint | +----------+ | |
+-------+ | | | |
| | | |
+-------+ | pool hub loop app | +-------+ | |
| Heart | <-----+ + + + + +--------> | Agent | | |
+-------+ | | | | | | +-------+ | |
| | | | | | +---------+ | |
+------------------------------------------+--------> | Mingle | | |
| | | | +---------+ | |
| | | | | |
v v v v | |
| |
+-----------------+ +-------+ +----------------+ +---------------+ +--------------+ | |
| prefork.TaskPool| | | | loops.asynloop | | Celery | | AMQP | | |
+-----------------+ | Hub | +----------------+ | | | | | |
| | | amqp +-----------> | Connection+------+ |
| | +---------------+ +--------------+ |
+---+---+ |
| |
+---------------------------------------------------------------------------------------->+
手机如下:
3.3 Tasks step 子组件
因为网络连接已经配置好,所以本部分就引入了各种 tasks,我们先分析一下loop的开启。
c 的内容为:<Consumer: celery (running)>,因此就针对Consumer进行操作。
我们首先要介绍下 celery 的 task 思路。
Celery 启动之后,会查找代码中,哪些类或者函数使用了 @task注解,然后就把这些 类或者函数注册到全局回调集合中。得倒了一个 全局 set :_on_app_finalizers
,这个 set 用来收集所有的 任务 tasks 类。
目前 Celery 知道了有哪些 task,并且把它们收集起来放在了 on_app_finalizers,但是还不知道它们的逻辑意义。或者可以这么认为,Celery 只是知道有哪些类,但是没有这些类的实例,也需要建立联系。
所以,Celery 把全局回调集合 _on_app_finalizers 中的回调函数运行,得到任务的实例,然后就把它们加入到 Celery 的任务列表 tasks。
这个 tasks 就是后续消费消息时候使用的。根据 客户端提供的 task 名字 得到具体 task 实例,然后处理。
self._tasks = {TaskRegistry: 10}
NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7fb652da5fd0>
'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7fb652da5fd0>
'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7fb652da5fd0>
'celery.group' = {group} <@task: celery.group of myTest at 0x7fb652da5fd0>
'celery.map' = {xmap} <@task: celery.map of myTest at 0x7fb652da5fd0>
'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7fb652da5fd0>
'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7fb652da5fd0>
'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7fb652da5fd0>
'myTest.add' = {add} <@task: myTest.add of myTest at 0x7fb652da5fd0>
'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7fb652da5fd0>
__len__ = {int} 10
所以我们下一步看看如何启动 task 组件。
3.3.1 启动 start
Task启动如下:
- 更新已知的任务;
- 获取到
kombu . consumer
,就是c . task_consumer
; - 开始消费;
具体如下:
class Tasks(bootsteps.StartStopStep):
"""Bootstep starting the task message consumer."""
requires = (Mingle,)
def __init__(self, c, **kwargs):
c.task_consumer = c.qos = None
super().__init__(c, **kwargs)
def start(self, c):
"""Start task consumer."""
c.update_strategies() # 更新已知的任务
# - RabbitMQ 3.3 completely redefines how basic_qos works..
# This will detect if the new qos smenatics is in effect,
# and if so make sure the 'apply_global' flag is set on qos updates.
qos_global = not c.connection.qos_semantics_matches_spec
# set initial prefetch count
c.connection.default_channel.basic_qos(
0, c.initial_prefetch_count, qos_global,
) # 设置计数
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
) # 开始消费
def set_prefetch_count(prefetch_count):
return c.task_consumer.qos(
prefetch_count=prefetch_count,
apply_global=qos_global,
)
c.qos = QoS(set_prefetch_count, c.initial_prefetch_count) # 设置计数
3.3.2 策略
关于 task 运行其实是需要一定策略的,这也可以认为是一种负载均衡。其策略如下:
SCHED_STRATEGY_FCFS = 1
SCHED_STRATEGY_FAIR = 4
SCHED_STRATEGIES = {
None: SCHED_STRATEGY_FAIR,
'default': SCHED_STRATEGY_FAIR,
'fast': SCHED_STRATEGY_FCFS,
'fcfs': SCHED_STRATEGY_FCFS,
'fair': SCHED_STRATEGY_FAIR,
}
Celery 会配置每个任务的回调策略以及回调方法,比如:'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
。
3.3.3 基础 default 策略
我们以基础 default 策略为例,看看其作用。在其中,会根据 task 实例 构建一个 Request,从而把 broker 消息,consumer,多进程都联系起来。具体在 Request. execute_using_pool 这里就会和多进程处理开始关联,比如和 comsumer 的 pool 进程池联系起来。
代码为:
def default(task, app, consumer,
info=logger.info, error=logger.error, task_reserved=task_reserved,
to_system_tz=timezone.to_system, bytes=bytes, buffer_t=buffer_t,
proto1_to_proto2=proto1_to_proto2):
"""Default task execution strategy.
Note:
Strategies are here as an optimization, so sadly
it's not very easy to override.
"""
hostname = consumer.hostname # 设置相关的消费者信息
connection_errors = consumer.connection_errors # 设置错误值
_does_info = logger.isEnabledFor(logging.INFO)
# task event related
# (optimized to avoid calling request.send_event)
eventer = consumer.event_dispatcher
events = eventer and eventer.enabled
send_event = eventer.send
task_sends_events = events and task.send_events
call_at = consumer.timer.call_at
apply_eta_task = consumer.apply_eta_task
rate_limits_enabled = not consumer.disable_rate_limits
get_bucket = consumer.task_buckets.__getitem__
handle = consumer.on_task_request
limit_task = consumer._limit_task
body_can_be_buffer = consumer.pool.body_can_be_buffer
Req = create_request_cls(Request, task, consumer.pool, hostname, eventer) # 返回请求类
revoked_tasks = consumer.controller.state.revoked
def task_message_handler(message, body, ack, reject, callbacks,
to_timestamp=to_timestamp):
if body is None:
body, headers, decoded, utc = (
message.body, message.headers, False, True,
)
if not body_can_be_buffer:
body = bytes(body) if isinstance(body, buffer_t) else body
else:
body, headers, decoded, utc = proto1_to_proto2(message, body) # 解析接受的数据
req = Req(
message,
on_ack=ack, on_reject=reject, app=app, hostname=hostname,
eventer=eventer, task=task, connection_errors=connection_errors,
body=body, headers=headers, decoded=decoded, utc=utc,
) # 实例化请求
if (req.expires or req.id in revoked_tasks) and req.revoked():
return
if task_sends_events:
send_event(
'task-received',
uuid=req.id, name=req.name,
args=req.argsrepr, kwargs=req.kwargsrepr,
root_id=req.root_id, parent_id=req.parent_id,
retries=req.request_dict.get('retries', 0),
eta=req.eta and req.eta.isoformat(),
expires=req.expires and req.expires.isoformat(),
) # 如果需要发送接受请求则发送
if req.eta: # 时间相关处理
try:
if req.utc:
eta = to_timestamp(to_system_tz(req.eta))
else:
eta = to_timestamp(req.eta, timezone.local)
except (OverflowError, ValueError) as exc:
req.reject(requeue=False)
else:
consumer.qos.increment_eventually()
call_at(eta, apply_eta_task, (req,), priority=6)
else:
if rate_limits_enabled: # 速率限制
bucket = get_bucket(task.name)
if bucket:
return limit_task(req, bucket, 1)
task_reserved(req)
if callbacks:
[callback(req) for callback in callbacks]
handle(req) # 处理接受的请求
return task_message_handler
此时处理的handler就是在consumer初始化的时候传入的w.process_task,
def _process_task(self, req):
"""Process task by sending it to the pool of workers."""
try:
req.execute_using_pool(self.pool)
except TaskRevokedError:
try:
self._quick_release() # Issue 877
except AttributeError:
pass
具体可以看到 Request. execute_using_pool 这里就会和多进程处理开始关联,比如和 comsumer 的 pool 进程池联系起来。
3.3.4 更新已知任务策略
启动时候,会调用 update_strategies 更新已知的任务策略。
class Tasks(bootsteps.StartStopStep):
"""Bootstep starting the task message consumer."""
def start(self, c):
"""Start task consumer."""
c.update_strategies()
代码如下:
def update_strategies(self):
loader = self.app.loader
for name, task in self.app.tasks.items():
self.strategies[name] = task.start_strategy(self.app, self)
task.__trace__ = build_tracer(name, task, loader, self.hostname,
app=self.app)
self.app.tasks为应用启动时收集的任务。此时需要再次看看是否需要更新策略。
变量如下:
self.app.tasks = {TaskRegistry: 10}
NotRegistered = {type} <class 'celery.exceptions.NotRegistered'>
'celery.chunks' = {chunks} <@task: celery.chunks of myTest at 0x7ffe3ff08198>
'myTest.add' = {add} <@task: myTest.add of myTest at 0x7ffe3ff08198>
'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest at 0x7ffe3ff08198>
'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest at 0x7ffe3ff08198>
'celery.group' = {group} <@task: celery.group of myTest at 0x7ffe3ff08198>
'celery.map' = {xmap} <@task: celery.map of myTest at 0x7ffe3ff08198>
'celery.chain' = {chain} <@task: celery.chain of myTest at 0x7ffe3ff08198>
'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest at 0x7ffe3ff08198>
'celery.chord' = {chord} <@task: celery.chord of myTest at 0x7ffe3ff08198>
'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest at 0x7ffe3ff08198>
__len__ = {int} 10
self = {Consumer} <Consumer: celery (running)>
此时我们继续查看task.start_strategy函数,
def start_strategy(self, app, consumer, **kwargs):
return instantiate(self.Strategy, self, app, consumer, **kwargs) # 生成task实例
操作之后,得到 strategies 如下,里面都是每个任务的callback方法,目前都是 task_message_handler,就是在这里,会 根据 task 实例 构建一个 Request,从而把 broker 消息,consumer,多进程都联系起来。
strategies = {dict: 10}
'celery.chunks' = {function} <function default.<locals>.task_message_handler at 0x7fc5a47d5a60>
'celery.backend_cleanup' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878400>
'celery.chord_unlock' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878598>
'celery.group' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878840>
'celery.map' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878ae8>
'celery.chain' = {function} <function default.<locals>.task_message_handler at 0x7fc5a4878d90>
'celery.starmap' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b0d0>
'celery.chord' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b378>
'myTest.add' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b620>
'celery.accumulate' = {function} <function default.<locals>.task_message_handler at 0x7fc5a487b8c8>
__len__ = {int} 10
逻辑为(因为Consumer成员变量太多,为了画图清晰,所以省略了部分变量):
+-----------------------+ +---------------------------+
| Celery | | Consumer |
| | | |
| consumer +---------------------> | | +---------------+
| | | task_consumer +---------------> | amqp.Consumer |
| _tasks | | | +---------------+
| + | | |
| | | | strategies +----------------+
+-----------------------+ | | |
| | | |
| +---------------------------+ |
| v
v
+------------------------------------------------------+-------------------------------------+ +-----------------------------------------------------------------------------+
| | | strategies = {dict: 10} |
| TaskRegistry | | 'celery.chunks' = function default.<locals>.task_message_handler |
| | | 'celery.backend_cleanup' = function default.<locals>.task_message_handler |
| NotRegistered = {type} <class 'celery.exceptions.NotRegistered'> | | 'celery.chord_unlock' = function default.^locals>.task_message_handler |
| 'celery.chunks' = {chunks} <@task: celery.chunks of myTest> | | 'celery.group' = function default.<localsv.task_message_handler |
| 'celery.backend_cleanup' = {backend_cleanup} <@task: celery.backend_cleanup of myTest > | | 'celery.map' = function default.<locals>.task_message_handler |
| 'celery.chord_unlock' = {unlock_chord} <@task: celery.chord_unlock of myTest> | | 'celery.chain' = function default.<locals>.task_message_handler |
| 'celery.group' = {group} <@task: celery.group of myTest> | | 'celery.starmap' = function default.<locals>.task_message_handler |
| 'celery.map' = {xmap} <@task: celery.map of myTest> | | 'celery.chord' = function default.<locals>.task_message_handler |
| 'celery.chain' = {chain} <@task: celery.chain of myTest> | | 'myTest.add' = function default.<locals^.task_message_handler |
| 'celery.starmap' = {xstarmap} <@task: celery.starmap of myTest> | | 'celery.accumulate' = function default.vlocals>.task_message_handler |
| 'celery.chord' = {chord} <@task: celery.chord of myTest> | | |
| 'myTest.add' = {add} <@task: myTest.add of myTest> | +-----------------------------------------------------------------------------+
| 'celery.accumulate' = {accumulate} <@task: celery.accumulate of myTest> |
| |
+--------------------------------------------------------------------------------------------+
手机上如图,主要就是 tasks,以及其对于的 strategies:
3.3.5 消费
如下代码就把 task 和 对应的 kombu consumer 联系起来。
c.task_consumer = c.app.amqp.TaskConsumer(
c.connection, on_decode_error=c.on_decode_error,
)
c.app.amqp.TaskConsumer的作用就是返回一个 Consumer。可以看出来,返回的为 c.task_consumer,即 kombu . Consumer,也配置了c.connection进去。
- <Consumer: [<Queue celery -> <Exchange celery(direct) bound to chan:1> -> celery bound to chan:1>]>
- <Connection: redis://localhost:6379>
于是,Celery 的 Consumer 组件就和 kombu的 consumer 联系起来。既,celery.consumer.task_consumer 就是一个 kombu consumer,而且此时 这个 kombu . consumer 已经和 channel 联系了起来,当 connection 之中有消息,就会回调到 kombu . consumer。我们下面就能看到如何使用。
代码如下:
from kombu import Connection, Consumer, Exchange, Producer, Queue, pools
class AMQP:
"""App AMQP API: app.amqp."""
Connection = Connection
Consumer = Consumer
Producer = Producer
def TaskConsumer(self, channel, queues=None, accept=None, **kw):
if accept is None:
accept = self.app.conf.accept_content
return self.Consumer(
channel, accept=accept,
queues=queues or list(self.queues.consume_from.values()),
**kw
)
逻辑如下(因为Consumer成员变量太多,为了画图清晰,所以省略了部分变量):
+--------------------------------+
| [Consumer] | For Read
| | +----------------------------------------+
| Connection+-------> | <Connection: redis://localhost:6379//> |
| | +----------------------------------------+
| |
| | +--------+
| +-----> | Tasks |
| | +--------+
| |
| app task_consumer+--------------------------------------------------------------------------------------+
| + | |
| | | |
| | | |
+--------------------------------+ |
| +-------------------------------------+ |
| | celery.app.amqp.AMQP | |
| | | |
+-----+----------------------+ | | |
| Celery | | BrokerConnection +-------> kombu.connection.Connection |
| | | | |
| amqp+------------>+ Connection +-------> kombu.connection.Connection |
| | | | |
| | | Consumer +-------> kombu.messaging.Consumer <----+
+----------------------------+ | |
| Producer +-------> kombu.messaging.Producer
| |
| producer_pool +-------> kombu.pools.ProducerPool
| |
| queues +-------> celery.app.amqp.Queues
| |
| router +-------> celery.app.routes.Router
+-------------------------------------+
手机如图
3.4 event loop 子组件
上一个子组件配置了各种 task,本子组件就开启了对应 tasks 的消费。代码位置在: celery/worker/consumer/consumer.py。
对应了 ‘celery.worker.consumer.consumer:Evloop’ 这个 step。
[<step: Connection>, <step: Events>, <step: Mingle>, <step: Gossip>, <step: Tasks>, <step: Control>, <step: Heart>, <step: event loop>]
class Evloop(bootsteps.StartStopStep):
"""Event loop service.
Note:
This is always started last.
"""
label = 'event loop'
last = True
def start(self, c):
self.patch_all(c)
c.loop(*c.loop_args())
def patch_all(self, c):
c.qos._mutex = DummyLock()
其实我们发现,这里就是调用了 consumer中的loop函数进行循环。
3.4.1 loop in consumer
代码位于 celery/worker/loops.py。
consumer中的loop函数就是位于celery/worker/loops.py中的 asynloop 函数。
init有如下代码配置了loop函数:
if not hasattr(self, 'loop'):
self.loop = loops.asynloop if hub else loops.synloop
此时具体如下(因为Consumer成员变量太多,为了画图清晰,所以省略了部分变量):
+--------------------------------+ +--------+
| [Consumer] +-----> | Evloop |
| | +--------+
| | +--------------------------+
| | | on_tick +--------> Transport.register_with_event_loop
| +-------> | Hub |
| | | poller +---------> kombu.utils.eventio._poll
| | | |
| | | readers |
| | | |
| create_task_handler loop +-----------------------> create_loop+-------> loops.asynloop
| | | |
| | +--------------------------+
| app |
| + |
| | task_consumer +-------------------------------------------------------------------------->+
| | | |
+--------------------------------+ |
| |
| |
| |
+----+---------+ |
| Celery | +-------------------------------------+ |
| | | celery.app.amqp.AMQP | |
| | | | |
| | | | |
| | | BrokerConnection +-------> kombu.connection.Connection |
| | | | |
| amqp+-------->+ Connection +-------> kombu.connection.Connection |
| | | | |
+--------------+ | Consumer +-------> kombu.messaging.Consumer <----------+
| |
| Producer +-------> kombu.messaging.Producer
| |
| producer_pool +-------> kombu.pools.ProducerPool
| |
| queues +-------> celery.app.amqp.Queues
| |
| router +-------> celery.app.routes.Router
+-------------------------------------+
手机为:
3.4.2 配置 kombu.consumer
前面有如下代码:
c.loop(*c.loop_args())
注意这里用到的为 self.task_consumer,即 kombu . Consumer,也配置了c.connection进去。
def loop_args(self):
return (self, self.connection, self.task_consumer,
self.blueprint, self.hub, self.qos, self.amqheartbeat,
self.app.clock, self.amqheartbeat_rate)
此时逻辑如下(因为Consumer成员变量太多,为了画图清晰,所以省略了部分变量):
+--------------------------------+ +--------+
| [Consumer] +--------> | Evloop |
| | +--------+
| | +--------------------------+
| | | on_tick +-----+--> Transport.register_with_event_loop
| +-------> | Hub | |
| | | | +--> AsynPool._create_write_handlers.<locals>.on_poll_start
| | | | +
| | | | |
| | | | v
| create_task_handler | | | iterate_file_descriptors_safely
| | | poller +---------> kombu.utils.eventio._poll ^
| | | | + |
| app loop +-----------------------> create_loop+-------> loops.asynloop | |
| + | | | | +-----+ |
| | task_consumer | +--------------------------+ +----------> | fd | +--------+
| | + | +-----+
| | | |
| | | | +--------------------------------------+
| | | Connection +----------> | <Connection: redis://localhost:6379> |
| | | | +--------------------------------------+
+--------------------------------+
| | ^
| | |
| v |
+----+----+ +----+-------------------------+ |
| Celery | | kombu . Consumer | |
| | | | |
| | | channel+--------------+
+---------+ | |
+------------------------------+
手机上如下:
)
3.4.3 启动消费
在 asynloop 中,会:
-
设置消息处理(解析消息并执行)函数,就是说,真正的消息处理(解析消息并执行)的逻辑是这个create_task_handler;
-
设置 kombu . consumer 的消息回调函数,就是说,on_task_received 就是最后接受消息的函数。
-
调用 hub.create_loop() 得到执行引擎;
-
调用 next(loop) 执行引擎;
def asynloop(obj, connection, consumer, blueprint, hub, qos,
heartbeat, clock, hbrate=2.0):
"""Non-blocking event loop."""
RUN = bootsteps.RUN
update_qos = qos.update
errors = connection.connection_errors
on_task_received = obj.create_task_handler() # 设置消息处理(解析消息并执行)函数
_enable_amqheartbeats(hub.timer, connection, rate=hbrate)
consumer.on_message = on_task_received
obj.controller.register_with_event_loop(hub)
obj.register_with_event_loop(hub)
consumer.consume()
obj.on_ready()
loop = hub.create_loop()
try:
while blueprint.state == RUN and obj.connection:
state.maybe_shutdown()
# We only update QoS when there's no more messages to read.
# This groups together qos calls, and makes sure that remote
# control commands will be prioritized over task messages.
if qos.prev != qos.value:
update_qos()
try:
next(loop)
except StopIteration:
loop = hub.create_loop()
finally:
try:
hub.reset()
except Exception as exc: # pylint: disable=broad-except
logger.exception(
'Error cleaning up after event loop: %r', exc)
至此,异步Loop就开启了,然后就开始了服务端的事件等待处理。下一篇文章我们将分析 task 概念以及实现。