带你认识 flask 后台作业
- 2019 年 12 月 10 日
- 筆記
01
任务类别简介
任务进程为后台作业提供了一个便捷的解决方案。Worker过程独立于应用程序运行,甚至可以位于不同的系统上。应用程序和worker之间的通信是通过消息完成的。通过与物理相互作用来监视其进度。下图展示了一个典型的实现:

Python中最流行的任务类别是Celery。这是一个相当复杂的重叠,它有很多选项并支持多个消息示例。另一个流行的Python任务位置是Redis Queue(RQ),它牺牲了一些替代,,仅支持Redis消息本身,但作为交换,它的建立要比Celery简单长度
Celery和RQ都非常适合在Flask应用程序中支持后台任务,所以我可以选择更简单的RQ。不过,用Celery实现相同的功能其实也不难。如果您对Celery更有吸引力,可以阅读我的博客中的将Celery与Flask文章一起使用
02
使用RQ
RQ是一个标准的Python三方重叠,用pip
安装:
(venv) $ pip install rq (venv) $ pip freeze > requirements.txt
正如我前面提到的,应用和RQ worker之间的通信将在Redis消息中执行,因此你需要运行Redis服务器。有很多途径来安装和运行Redis服务器,可以下载其内核并执行编译和安装。如果你使用的是Windows中,微软在此处维护了Redis的的安装程序。在Linux的上,你可以通过操作系统的软件包管理器安装Redis的。Mac OS X的用户可以运行brew install redis
,使用然后redis-server
命令手动启动服务
除了确保服务正在运行并可以识别RQ访问之外,你不需要与Redis进行其他交互
03
创建任务
一个任务,不过是一个Python函数而已。以下是一个示例任务,我将其引入一个新的app / tasks.py模块:
app / tasks.py:示例后台任务
import time def example(seconds): print('Starting task') for i in range(seconds): print(i) time.sleep(1) print('Task completed')
该任务将秒数作为参数,然后在该时间量内等待,并每秒打印一次计数器
04
运行 RQ 工人
任务准备就绪,可以通过rq worker
来启动一个worker进程了:
(venv) $ rq worker microblog-tasks 18:55:06 RQ worker 'rq:worker:miguelsmac.90369' started, version 0.9.1 18:55:06 Cleaning registries for queue: microblog-tasks 18:55:06 18:55:06 *** Listening on microblog-tasks...
microblog-tasks
如果您想启动多个worker来扩展量子,您只需要运行rq worker
来生成更多连接到同一个模型的进程,就可以使用Worker进程现在连接到了Redis,并在称为的上面上查看可能的分配给它的任何作业。在生产环境中,您可能希望至少运行可用的CPU数量的工人。。然后,,当作业出现在特定位置时,任何可用的worker进程都可以获取它
05
执行任务
现在打开第二个终端窗口并激活虚拟环境。我将使用shell会话来启动worker中的example()
任务:
>>> from redis import Redis >>> import rq >>> queue = rq.Queue('microblog-tasks', connection=Redis.from_url('redis://')) >>> job = queue.enqueue('app.tasks.example', 23) >>> job.get_id() 'c651de7f-21a8-4068-afd5-8b982a6f6d32'
如果采用的是Redis服务器运行在不同的主机或端口号上,则使用RQ的Queue
类表示从应用程序端看到的任务类型。Redis
则需要使用其他URL。
队列的enqueue()
方法用于将作业添加到队列中。第一个参数是要执行的任务的名称,可直接传入函数对象或导入字符串。我发现传入字符串更加方便,因为不需要在应用程序对enqueue()
预期的任何剩余参数将被传递给worker中运行的函数。
enqueue()
只要进行了调用,运行着RQ worker的终端窗口上就会出现一些活动。你会看到example()
函数正在运行,并且连续打印一次计数器。同时,你的其他终端不会被分开,你可以继续在shell在上面的示例中,我调用job.get_id()
方法来获取分配给任务的唯一标识符。你可以尝试使用另一个有趣表达式来检查worker上的函数是否已完成:
>>> job.is_finished False
如果你像我在上面的示例中那样传递了23
,那么函数将运行约23秒。在那之后,job.is_finished
表达式将True
转化为。就是这么简单,炫酷否?
一旦函数完成,worker又回到等待作业的状态,所以如果你想进行更多的实验,你可以用不同的参数重复执行enqueue()
调用。),但最终会被删除。这很重要,任务类别不保留已执行作业的历史记录
06
报告任务进度
通常,对于长期运行的任务,您需要将一些进度信息提供给应用程序,从而可以将其显示给用户。RQ通过使用作业对象的meta
属性来支持这一点。让我重新编写example()
任务来编写进度报告:
app / tasks.py::带进度的示例后台任务
import time from rq import get_current_job def example(seconds): job = get_current_job() print('Starting task') for i in range(seconds): job.meta['progress'] = 100.0 * i / seconds job.save_meta() print(i) time.sleep(1) job.meta['progress'] = 100 job.save_meta() print('Task completed')
这个新版本的example()
使用RQ的get_current_job()
函数来获取一个作业实例,该实例与提交任务时返回给应用程序的实例类似。作业对象的meta
属性是一个字典,任务可以编写任何想要的与应用程序通信的自定义数据。在此示例中,我编写了progress
,表示完成任务的百分比。每次进程更新时,我都调用job.save_meta()
指示RQ将数据写入Redis,应用程序可以在其中找到它。
在应用程序方面(目前只是一个Python shell),我可以运行此任务,然后监视进度,如下所示:
>>> job = queue.enqueue('app.tasks.example', 23) >>> job.meta {} >>> job.refresh() >>> job.meta {'progress': 13.043478260869565} >>> job.refresh() >>> job.meta {'progress': 69.56521739130434} >>> job.refresh() >>> job.meta {'progress': 100} >>> job.is_finished True
如您所见,在另一侧,meta
属性可以被重新读。需要调用refresh()
方法来从Redis更新内容
07
任务的数据库表示
对于Web应用程序,情况会变得更复杂一些,因为一旦任务传递请求的处理而启动,该请求随即结束,而该任务因为我希望应用程序跟踪每个用户正在运行的任务,所以我需要使用数据库表来维护状态。你可以在下面看到新的Task
模型实现:
app / models.py:任务模型
# ... import redis import rq class User(UserMixin, db.Model): # ... tasks = db.relationship('Task', backref='user', lazy='dynamic') # ... class Task(db.Model): id = db.Column(db.String(36), primary_key=True) name = db.Column(db.String(128), index=True) description = db.Column(db.String(128)) user_id = db.Column(db.Integer, db.ForeignKey('user.id')) complete = db.Column(db.Boolean, default=False) def get_rq_job(self): try: rq_job = rq.job.Job.fetch(self.id, connection=current_app.redis) except (redis.exceptions.RedisError, rq.exceptions.NoSuchJobError): return None return rq_job def get_progress(self): job = self.get_rq_job() return job.meta.get('progress', 0) if job is not None else 100
模型这个状语从句:以前的模型有一个有趣的区别的英文id
主键字段的英文字符串类型,而不是整数类型。这是因为对于这个模型,我不会依赖数据库自己的主键生成,而是使用由RQ生成的作业标识符。
该模型将存储符合任务命名规范的名称(会传递给RQ),适用于向用户显示的任务描述,该任务的所属用户的关系以及任务是否已完成的布尔值。complete
字段的目的是将正在运行的任务与已完成的任务分开,因为运行中的任务需要特殊处理才能显示最新进度。
get_rq_job()
辅助方法可以用给定的任务ID加载RQ Job
实例。的英文这通过Job.fetch()
完成的,它会从Redis的存在中的数据中加载Job
实例。get_progress()
方法建立在get_rq_job()
的基础之上,并返回任务的进度百分比。该方法做一些有趣的假设,如果模型中的作业ID不存在于RQ变量中,则表示作业已完成和数据已过期并已从该中删除,因此在这种情况下返回的百分比为100。同时,如果job存在,但'meta'属性中找到进度相关的信息,那么可以安全地进行该作业计划运行,但还没有启动,所以在这种情况下进度是0。
改进更改数据库,需要生成新的迁移,然后升级数据库:
(venv) $ flask db migrate -m "tasks" (venv) $ flask db upgrade
新模型也可以添加到shell一部分中,盔甲在shell会话中访问它时无需导入:
microblog.py:添加任务模型到shell上下文中
from app import create_app, db, cli from app.models import User, Post, Message, Notification, Task app = create_app() cli.register(app) @app.shell_context_processor def make_shell_context(): return {'db': db, 'User': User, 'Post': Post, 'Message': Message, 'Notification': Notification, 'Task': Task}
08
将RQ与 Flask 集合在一起
Redis服务的连接URL需要添加到配置中:
class Config(object): # ... REDIS_URL = os.environ.get('REDIS_URL') or 'redis://'
与往常一样,Redis连接URL将来自环境变量,如果该变量未定义,则替换为该服务在当前主机的端口上运行并使用URL。
应用工厂函数将负责初始化Redis和RQ:
app / __ init__.py:整合RQ
# ... from redis import Redis import rq # ... def create_app(config_class=Config): # ... app.redis = Redis.from_url(app.config['REDIS_URL']) app.task_queue = rq.Queue('microblog-tasks', connection=app.redis) # ...
app.task_queue
将成为提交任务的重量。将附加到应用上会提供很大的便利,因为我可以在应用的任何地方使用current_app.task_queue
来访问它。为了方便应用的任何部分提交或检查任务,我可以在User
模型中创建一些辅助方法:
app / models.py:用户模型中的任务辅助方法
# ... class User(UserMixin, db.Model): # ... def launch_task(self, name, description, *args, **kwargs): rq_job = current_app.task_queue.enqueue('app.tasks.' + name, self.id, *args, **kwargs) task = Task(id=rq_job.get_id(), name=name, description=description, user=self) db.session.add(task) return task def get_tasks_in_progress(self): return Task.query.filter_by(user=self, complete=False).all() def get_task_in_progress(self, name): return Task.query.filter_by(name=name, user=self, complete=False).first()
launch_task()
方法是将任务提交到RQ,然后将其添加到数据库中。name
参数是函数名称,如app / tasks.py中所定义的那样。提交给RQ时,该函数已app.tasks.
预先添加到该名称中以构建符合规范的函数名称。description
参数是对呈现给用户的任务的友好描述。对于导出用户动态的函数,我将名称设置为export_posts
,将描述设置为Exporting posts...
。其余参数将传递给任务函数。launch_task()
函数首先调用队列的enqueue()
方法来提交作业。返回的作业对象包含由RQ分配的任务ID,因此我可以使用它在我的数据库中创建相应的Task
对象
请注意,launch_task()
将新的任务对象添加到会话中,但不会发出提交。替代,最好在更高层次函数中的数据库会话上进行操作,因为它允许您在替代事务中组合由替代这不是一个严格的规则,并且,在本章后面的子函数中也会存在一个例外的提交
get_tasks_in_progress()
方法返回该用户未完成任务的列表。稍后您会看到,我使用此方法在将有关正在运行的任务的信息渲染到用户的页面中
最后,get_task_in_progress()
是上一个方法的简化版本并返回指定的任务。我阻止用户同时启动两个或多个相同类型的任务,因此在启动任务之前,可以使用此方法来确定前一个任务是否还在运行
09
利用 RQ 任务发送电子邮件
不要认为本节偏离主题,我在上面说过,当后台完成任务完成时,将使用包含所有用户动态的JSON文件向用户发送电子邮件。我在第十章中生成的电子邮件功能需要通过两种方式进行扩展。首先,我需要添加对文件附件的支持,刹车我可以附加JSON文件。串行,send_email()
函数总是使用后台线程初始化发送电子邮件。当我要从后台任务发送电子邮件时(已经是初步的了),基于线程的二级后台任务没有什么意义,所以我需要同时支持同步和异步电子邮件的发送。
幸运的是,Flask-Mail支持附件,所以我需要做的就是扩展send_email()
函数的控件关键字参数,然后在Message
对象中配置它。选择在前台发送电子邮件时,我只需要添加一个sync=True
的关键字参数即可:
app / email.py:发送带附件的邮件
# ... def send_email(subject, sender, recipients, text_body, html_body, attachments=None, sync=False): msg = Message(subject, sender=sender, recipients=recipients) msg.body = text_body msg.html = html_body if attachments: for attachment in attachments: msg.attach(*attachment) if sync: mail.send(msg) else: Thread(target=send_async_email, args=(current_app._get_current_object(), msg)).start()
消息类的attach()
方法接受三个定义附件的参数:文件名,媒体类型和实际文件数据。文件名就是收件人看到的与附件关联的名称。媒体类型定义了这种附件的类型,这有助于电子邮件读者适当地渲染它。例如,如果您发送为image/png
媒体类型,则电子邮件阅读器会知道该附件是一个图像,在这种情况下,它可以显示它。对于用户动态数据文件,我将使用JSON格式,该格式使用application/json
媒体类型。最后一个参数包含附件内容的字符串或字节序列。
简单来说,send_email()
的attachments
参数将成为一个元组列表,每个元组将有三个元素对应于attach()
的三个参数。因此,我需要转换列表中的每个元素作为参数发送给attach()
。在Python中,如果你想将列表或元组中的每个元素作为参数传递给函数,你可以使用func(*args)
将这个列表或元祖解包成函数中的多个参数,而不必枯燥地一个个地传递,如func(args[0], args[1], args[2])
。例如,如果如果没有,调用将会引发一个参数,即列表。你有一个列表args = [1, 'foo']
,func(*args)
将会传递两个参数,就和你调用func(1, 'foo')
一样。*args
如电子邮件的同步发送,我需要做的就是,当sync
是True
的时候恢复成调用mail.send(msg)
10
任务助手
尽管我上面使用的example()
任务是一个简单的独立函数,但已添加用户动态的函数却需要应用中具有的某些功能,例如访问数据库和发送电子邮件。因为这将在单独的进程中运行,所以我需要初始化Flask-SQLAlchemy和Flask-Mail,而Flask-Mail又需要Flask应用程序实例以从中获取它们的配置。因此,我将在app / tasks.py模块的顶部添加Flask应用程序实例和应用程序:
app / tasks.py:创建应用及其自身
from app import create_app app = create_app() app.app_context().push()
当使用flask
命令时,根目录中的microblog.py模块创建应用实例,但RQ worker实际上却一无所知,所以当任务函数时,应用程序在此模块中创建,因为这是RQ worker要导入的唯一模块。你已经在好几个地方app.app_context()
看到了方法,按下一个使其使应用成为“当前”的应用实例,这样一来Flask-SQLAlchemy等插件才可以使用current_app.config
获取它们的配置。。根本没有,current_app
表达式会返回一个错误。
然后我开始考虑如何在这个函数运行时报告进度。另外通过job.meta
字典传递进度信息之外,我还想将通知推送给客户端,刹车自动动态更新完成百分比。逐步,我将使用我在第二十一章中生成的通知机制。更新将以与未读消息徽章非常类似的方式工作。当服务器渲染模板时,则包含从job.meta
获得的“静态”进度信息,但一旦页面置于客户端的浏览器中,通知将使用通知来动态更新百分比。由于通知的原因,更新正在运行的任务的进度将比上一个示例中的操作稍微多一些,所以我将创建一个专用于更新进度的包装函数:
app / tasks.py:设置任务进度
from rq import get_current_job from app import db from app.models import Task # ... def _set_task_progress(progress): job = get_current_job() if job: job.meta['progress'] = progress job.save_meta() task = Task.query.get(job.get_id()) task.user.add_notification('task_progress', {'task_id': job.get_id(), 'progress': progress}) if progress >= 100: task.complete = True db.session.commit()
任务导出可以调用_set_task_progress()
来记录进度百分比。函数该首先将百分比写入job.meta
字典搜索并将其保存到Redis的,然后从数据库加载相应的任务对象,使用并task.user
已有的add_notification()
方法将通知推送给请求该任务的用户。通知将被命名为task_progress
,并且伴随关联的数据将成为具有两个关联的字典:任务标识符和进度数值。稍后我将添加JavaScript代码来处理这种新的通知类型
该函数查看进度来确认任务函数是否已完成,并在这种情况下下更新数据库中任务对象的complete
属性。数据库提交调用通过add_notification()
添加的任务和通知对象都立即保存到数据库。任务,确保不执行任何数据库更改,因为执行本次调用父父的更改也写入数据库
11
实现导出任务
现在所有的准备工作已经完成,可以开始编写导出函数了。这个函数的高层结构如下:
app / tasks.py:导出用户动态通用结构
def export_posts(user_id): try: # read user posts from database # send email with data to user except: # handle unexpected errors
请求处理器中的应用程序可以防止意外错误,因为Flask自身捕获异常,然后将其整个任务包装在try / except中。将运行在由RQ控制的单独前进中,而不是烧瓶,因此如果发生任何意外错误,任务将中止,RQ将向控制台显示错误,然后返回等待新的作业。worker的输出或将其记录到文件中,否则将永远不会发现有错误。
让我们从上面带有注释的三部分中最简单的错误处理部分开始梳理:
app / tasks.py:更新用户动态错误处理
import sys # ... def export_posts(user_id): try: # ... except: _set_task_progress(100) app.logger.error('Unhandled exception', exc_info=sys.exc_info())
发生意外错误时,我将通过将进度设置为100%来将任务标记为完成,然后使用Flask应用程序中的日志记录器对象记录错误以及如何跟踪信息(调用sys.exc_info()
来获得)。记录器来记录错误的好处在于,你可以观察到你为瓶应用实现的任何日志记录机制。例如,在第七章中,我配置了要发送到管理员电子邮件地址的错误。只要使用app.logger
,我也可以得到这些错误信息
接下来,我将编写实际的起始代码,它只需发出一个数据库查询并在循环中遍历结果,随之而来的累积在字典中:
app / tasks.py:从数据库读取用户动态
import time from app.models import User, Post # ... def export_posts(user_id): try: user = User.query.get(user_id) _set_task_progress(0) data = [] i = 0 total_posts = user.posts.count() for post in user.posts.order_by(Post.timestamp.asc()): data.append({'body': post.body, 'timestamp': post.timestamp.isoformat() + 'Z'}) time.sleep(5) i += 1 _set_task_progress(100 * i // total_posts) # send email with data to user except: # ...
时间格式将采用ISO 8601标准。我使用的Python的datetime
对象不存储时区,因此在以ISO格式导出时间后,我添加了'Z',它表示UTC
我维护了一个计数器i
,并且在进入循环之前还需要发出一个额外的数据库查询,查询total_posts
导致用户动态的总数。使用了i
和total_posts
,在每个循环迭代我都可以使用从0到100的数字来更新任务进度
您可能会好奇我为什么会在每个循环time.sleep(5)
迭代中加入调用。最终是我想要延长增量所需的时间,刹车在用户动态不多的情况下也可以方便地查看到逐步进度的增长
下面是函数的最后部分,将会带上data
附件发送邮件给用户:
app / tasks.py:发送带用户动态的邮件给用户
import json from flask import render_template from app.email import send_email # ... def export_posts(user_id): try: # ... send_email('[Microblog] Your blog posts', sender=app.config['ADMINS'][0], recipients=[user.email], text_body=render_template('email/export_posts.txt', user=user), html_body=render_template('email/export_posts.html', user=user), attachments=[('posts.json', 'application/json', json.dumps({'posts': data}, indent=4))], sync=True) except: # ...
只是其实对send_email()
函数的调用。附件被定义为一个元组,其中有三个元素被传递给瓶邮件的Message
对象的attach()
方法。元组中的第三个元素是附件内容,它是用Python中的json.dumps()
函数生成的。
这里引用了一对新模板,它们以纯文本和HTML格式提供电子邮件正文的内容。这是文本模板的内容:
app / templates / email / export_posts.txt:更新用户动态文本邮件模板
Dear {{ user.username }}, Please find attached the archive of your posts that you requested. Sincerely, The Microblog Team
这是HTML版本的邮件模板:
app / templates / email / export_posts.html:更新用户动态HTML邮件模板
<p>Dear {{ user.username }},</p> <p>Please find attached the archive of your posts that you requested.</p> <p>Sincerely,</p> <p>The Microblog Team</p>
12
应用中的导出功能
剩下的就是将这个功能连接到应用,刹车用户发起请求并通过电子邮件发送用户动态给他们
下面是新的export_posts
视图函数:
app / main / routes.py:导出用户动态路由和视图函数
@bp.route('/export_posts') @login_required def export_posts(): if current_user.get_task_in_progress('export_posts'): flash(_('An export task is currently in progress')) else: current_user.launch_task('export_posts', _('Exporting posts...')) db.session.commit() return redirect(url_for('main.user', username=current_user.username))
该功能首先检查用户是否有未完成的任务,并在这种情况下只是闪现消息。对同一用户同时执行两个发起任务是没有意义的,可以避免。我可以使用前面实现的get_task_in_progress()
方法来检查这种情况
如果一个用户没有正在运行的导出任务,则调用launch_task()
来启动它。第一个参数是将传递给RQ worker的函数的名称,改为为app.tasks.
。第二个参数只是一个友好的文本描述,将会显示给用户。这两个值都会被写入数据库中的任务对象。该函数以重定向到用户个人主页结束
我认为最合适的地方是在用户个人主页,只有在用户查看他们自己的主页时,链接在“编辑个人资料”链接下面显示:
app / templates / user.html:用户个人主页的导出链接
... <p> <a href="{{ url_for('main.edit_profile') }}"> {{ _('Edit your profile') }} </a> </p> {% if not current_user.get_task_in_progress('export_posts') %} <p> <a href="{{ url_for('main.export_posts') }}"> {{ _('Export your posts') }} </a> </p> ... {% endif %}
此链接的渲染是有条件的,因为我不希望它在用户已经有发起任务执行时出现。
如果你想尝试一下,你可以按如下方式启动应用和RQ worker:
- 确保Redis正在运行
- :一个终端窗口,启动至少一个RQ worker实例。本处你可以运行命令
rq worker microblog-tasks
- 再打开另一个终端窗口,使用
flask run
(记得先设置FLASK_APP
变量)命令启动Flask应用
13
进度通知
为了完善这个功能,我想在后台任务运行时提醒用户任务完成的进度。在浏览Bootstrap组件选项时,我决定在导航栏的下方使用一个Alert组件。横条。我用蓝色的警报框来渲染闪现的消息。现在我要添加一个绿色的警报框来显示任务进度。样式如下:
app / templates / base.html:基础模板中的导出进度Alert组件
... {% block content %} <div class="container"> {% if current_user.is_authenticated %} {% with tasks = current_user.get_tasks_in_progress() %} {% if tasks %} {% for task in tasks %} <div class="alert alert-success" role="alert"> {{ task.description }} <span id="{{ task.id }}-progress">{{ task.get_progress() }}</span>% </div> {% endfor %} {% endif %} {% endwith %} {% endif %} ... {% endblock %} ...
外部条件在用户未登录时跳过所有与Alert相关的标记。而对于已登录的用户,我通过称为创建的get_tasks_in_progress()
方法来获取当前的任务列表。在当前版本的应用中,我最多只能得到一个结果,因为我可以多个替换任务同时执行,但将来我可能要支持可以共存的其他类型的任务,所以以通用的方式渲染Alert可以节省我以后的时间。
对于每项任务,我都会在页面上渲染一个警报元素。警报的颜色由第二个CSS样式控制,本处是alert-success
,而在闪现消息是alert-info
,引导文档所有游戏有关警报的HTML结构的详细信息。警报文本包括存储在Task
模型中的description
细分,后面跟着完成百分比。
被百分比封装在具有id
属性的<span>
元素中。原因是我要在收到通知时用的JavaScript刷新百分比。我给任务ID附加末尾-progress
来构造id
属性。当有通知到达时,通过其中的任务ID,我可以很容易地使用#<task.id>-progress
选择器找到正确的<span>
元素来更新。
如果您此时进行尝试,则每次导航到新页面时都会看到“静态”的进度更新。您可以注意到,在启动导出任务后,您可以自由导航到应用程序的不同页面,正在运行的任务的状态始终都会展示出来
为了对span>
元素的百分比的动态更新做准备,我将在JavaScript端编写一个辅助函数:
app / templates / base.html:动态更新任务进度的辅助函数
... {% block scripts %} ... <script> ... function set_task_progress(task_id, progress) { $('#' + task_id + '-progress').text(progress); } </script> ... {% endblock %}
这个函数接受一个任务id
和一个进度值,并使用jQuery为这个任务定位<span>
元素,转换为新进度作为其内容写入。无需验证页面上是否存在该元素,因为如果没有找到该元素,jQuery将不会执行任何操作。
app / tasks.py中的_set_task_progress()
函数每次更新进度时调用add_notification()
,就会产生新的通知。而我在第二十一章明智地以完全通用的方式实现了通知功能。所以当浏览器定期向服务器发送时通知更新请求时,浏览器会获得通过add_notification()
方法添加的任何通知
但是,这些JavaScript代码只能识别具有unread_message_count
名称的那些通知,并忽略其余部分。我现在需要做的是扩展该函数,通过调用我上面定义的set_task_progress()
函数来处理task_progress
通知。以下是处理通知更新版本JavaScript代码:
app / templates / base.html:通知处理器
for (var i = 0; i < notifications.length; i++) { switch (notifications[i].name) { case 'unread_message_count': set_message_count(notifications[i].data); break; case 'task_progress': set_task_progress( notifications[i].data.task_id, notifications[i].data.progress); break; } since = notifications[i].timestamp; }
现在我需要处理两个不同的通知,我决定用一个switch
语句替换检查unread_message_count
通知名称的if
语句,该语句包含我现在需要支持的每个通知。如果你对“ C”系列语言不熟悉,就可能从未见过是switch语句,它提供了一种方便的语法,可以替代一长串的if/elseif
语句。这是一个很棒的特性,因为当我需要支持更多通知时,只需简单地添加case
块即可。
回顾一下,RQ任务附加到task_progress
通知的数据是一个包含两个元素task_id
和progress
的字典,这两个元素是我调用set_task_progress()
的两个参数。
如果您现在运行该应用,则绿色Alert插入的进度指示器将每10秒刷新一次(因为刷新通知的时间间隔是10秒)。
如果您要维护非英语语言文件,则需要使用Flask-Babel刷新翻译文件,然后添加新的翻译:
(venv) $ flask translate update
如果您使用的是编码翻译,那么我已经为你完成了翻译工作,因此可以从下载包中提取app / translations / es / LC_MESSAGES / messages.po文件,从而将其添加到您的项目中。
翻译文件到位后,还要编译翻译文件:
(venv) $ flask translate compile