分布式场景下使用APScheduler

  • 2020 年 2 月 19 日
  • 筆記

简介

APScheduler是一个定时任务框架,其主要功能就是方便控制不同类型的定时任务,本身并没有考虑分布式多实例情况下的一些问题,本篇文章就来简单谈谈APScheduler在简单分布式场景下的使用。

分布式带来的问题

比如有个服务A,服务A中有使用APScheduler添加任务的逻辑,每次添加的任务会在随后固定的某个时间点被APScheduler调用执行。

在单节点情况下,这并没有什么问题,但随着业务加大,你可能要开启多个服务A来做负载时,此时APScheduler就会出现重复执行任务的问题。

为了方便说明,这里使用MongoDB作为APScheduler的jobstore,使用线程池作为它的执行器。(如果你不明白我在说啥,建议看看此前APScheduler的文章)

scheduler = BlockingScheduler(      jobstores={"default": mongostore},      executors={"default": ThreadPoolExecutor(10)},      job_defaults={"coalesce": True, "max_instances": 3},      timezone='Asia/Shanghai',  )

如果开启了多个服务A,服务A中都使用了相同配置的scheduler,此时就会出现任务重复执行的问题。

为何会有这个问题?一起来阅读一下相关源码,一探究竟。

因为使用了BlockingScheduler作为调度器,所以直接看到该类的代码

# apscheduler/schedulers/blocking.py    class BlockingScheduler(BaseScheduler):      """      A scheduler that runs in the foreground      (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).      """      _event = None        # ... 省略部分代码        def _main_loop(self):          wait_seconds = TIMEOUT_MAX          while self.state != STATE_STOPPED:              # 等待事件通知,wait_seconds为等待事件通知的超时时间              # wait()方法会阻塞线程,直到事件标志状态为true。              self._event.wait(wait_seconds)              # clear()方法将事件标志状态设置为false              self._event.clear()              wait_seconds = self._process_jobs()

_main_loop方法会构成主循环,其具体的执行逻辑在 _process_jobs方法中, _process_jobs方法部分代码如下。

# apscheduler/schedulers/base.py/BaseScheduler    def _process_jobs(self):          """          Iterates through jobs in every jobstore, starts jobs that are due and figures out how long          to wait for the next round.            If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least          ``jobstore_retry_interval`` seconds.            """          if self.state == STATE_PAUSED:              self._logger.debug('Scheduler is paused -- not processing jobs')              return None            self._logger.debug('Looking for jobs to run')          now = datetime.now(self.timezone) # 当前时间          next_wakeup_time = None          events = []            with self._jobstores_lock:              # 从_jobstores中获取当前要处理的任务              for jobstore_alias, jobstore in self._jobstores.items():                  try:                      # 以当前时间为基准,判断是否到了执行时间                      due_jobs = jobstore.get_due_jobs(now)                  except Exception as e:                      # Schedule a wakeup at least in jobstore_retry_interval seconds                      # 在 jobstore 重试间隔时间(秒)内唤醒                      self._logger.warning('Error getting due jobs from job store %r: %s',                                           jobstore_alias, e)                      # 唤醒时间                      retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)                      if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:                          next_wakeup_time = retry_wakeup_time                        continue          # ... 省略部分代码

_process_jobs方法通过 due_jobs=jobstore.get_due_jobs(now)获取jobstore中的任务对象,通过前面的配置可知,mongodb是这里的jobstore。

看到mongodb对应jobstore的代码。

# apscheduler/jobstores/mongodb.py/MongoDBJobStore    def get_due_jobs(self, now):          timestamp = datetime_to_utc_timestamp(now)          return self._get_jobs({'next_run_time': {'$lte': timestamp}})

getduejobs方法主要调用 _get_jobs方法去获取任务对象,要关注的重点是,它使用了lte以及时间戳作为参数,简单用过mongodb的朋友都知道lte其实就是小于等于的意思,简单而言,只要小于或等于timestamp这个时间戳的任务都会被获取。

都看到这了,顺便看一下 _get_jobs方法的代码吧。

def _reconstitute_job(self, job_state):          # 反序列化,获取任务对象参数          job_state = pickle.loads(job_state)          job = Job.__new__(Job)          job.__setstate__(job_state)          job._scheduler = self._scheduler          job._jobstore_alias = self._alias          return job        def _get_jobs(self, conditions):          jobs = []          failed_job_ids = []          for document in self.collection.find(conditions, ['_id', 'job_state'],                                               sort=[('next_run_time', ASCENDING)]):              try:                  jobs.append(self._reconstitute_job(document['job_state']))              except BaseException:                  self._logger.exception('Unable to restore job "%s" -- removing it',                                         document['_id'])                  failed_job_ids.append(document['_id'])            # Remove all the jobs we failed to restore          if failed_job_ids:              self.collection.remove({'_id': {'$in': failed_job_ids}})            return jobs # 返回所有小于等于某一时间戳的任务对象

到这里就很清楚APScheduler会出现重复执行任务问题的原因。

启动多个服务A,相当于运行同一份代码多次,此时APSCheduler的配置都是相同的,即多个APScheduler实例连接同一个mongodb,此时mongodb中存在一个任务就有可能被APScheduler消费多次。

使用分布式锁

要解决APScheduler多实例重复执行任务的问题,最常见的解决方案就是使用分布式锁,而分布式锁中最常见的就是基于Redis构建的字段锁。

Redis字段锁很容易理解,就是通过set命令在redis中设置一个字段,如果字段存在,则是加锁状态,而字段不存在,则是解锁状态。

设计Redis锁时,需要考虑操作原子性,避免同时去获取Redis字段的情况出现,还需要考虑字段超时,避免因逻辑错误出现的长时间死锁,所以设计Redis字段锁还是需要一些tick的,这里分享一种写法,如下。

@contextmanager  def redis_lock(name, timeout=(24 + 2) * 60 * 60):      try:          today_string = datetime.datetime.now().strftime("%Y-%m-%d")          key = f"servername.lock.{name}.{today_string}"          log.info(f"<Redis Lock> {key}")          # 原子性的锁: 不存在,创建锁,返回1,相当于获取锁;存在,创建锁失败,返回0,相当于获取锁失败;过一段时间超时,避免死锁          # nx: 不存在,key值设置为value,返回1,存在,不操作,返回0          # ex: 设置超时          lock = bonus_redis.set(key, value=1, nx=True, ex=timeout)          yield lock      finally:          bonus_redis.delete(key) # 释放锁

通过上面方法设置的锁与常用的锁不同。

如果程序没有获得常用的锁,则会阻塞等待锁,而这里涉及的锁并不会等待,它的作用只是保证被锁方法在特定时间段内只执行一次。

此外还要考虑的是加锁位置,因为APScheduler会获取小于某个时间戳下的所有任务,那为了避免任务被重复执行,最直观的做法就是在任务函数中加上锁,例子如下。

# 要被执行的任务函数  def tick():      with redislock() as lock:          if lock:              print('Tick! The time is: %s' % datetime.now())    if __name__ == '__main__':      scheduler = BackgroundScheduler()      # 添加到相应的jobstore中      scheduler.add_job(tick, 'interval', seconds=3) # 添加一个任务,3秒后运行      scheduler.start()      print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

结尾

至此分布式场景下使用APScheduler的方法就介绍完了,核心思想就是确保多个APScheduler实例对同一任务只会执行一次,感谢你的阅读。

如果文章对你有所帮助,点击「在看」支持二两,叩谢豪恩。