分佈式場景下使用APScheduler

  • 2020 年 2 月 19 日
  • 筆記

簡介

APScheduler是一個定時任務框架,其主要功能就是方便控制不同類型的定時任務,本身並沒有考慮分佈式多實例情況下的一些問題,本篇文章就來簡單談談APScheduler在簡單分佈式場景下的使用。

分佈式帶來的問題

比如有個服務A,服務A中有使用APScheduler添加任務的邏輯,每次添加的任務會在隨後固定的某個時間點被APScheduler調用執行。

在單節點情況下,這並沒有什麼問題,但隨着業務加大,你可能要開啟多個服務A來做負載時,此時APScheduler就會出現重複執行任務的問題。

為了方便說明,這裡使用MongoDB作為APScheduler的jobstore,使用線程池作為它的執行器。(如果你不明白我在說啥,建議看看此前APScheduler的文章)

scheduler = BlockingScheduler(      jobstores={"default": mongostore},      executors={"default": ThreadPoolExecutor(10)},      job_defaults={"coalesce": True, "max_instances": 3},      timezone='Asia/Shanghai',  )

如果開啟了多個服務A,服務A中都使用了相同配置的scheduler,此時就會出現任務重複執行的問題。

為何會有這個問題?一起來閱讀一下相關源碼,一探究竟。

因為使用了BlockingScheduler作為調度器,所以直接看到該類的代碼

# apscheduler/schedulers/blocking.py    class BlockingScheduler(BaseScheduler):      """      A scheduler that runs in the foreground      (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block).      """      _event = None        # ... 省略部分代碼        def _main_loop(self):          wait_seconds = TIMEOUT_MAX          while self.state != STATE_STOPPED:              # 等待事件通知,wait_seconds為等待事件通知的超時時間              # wait()方法會阻塞線程,直到事件標誌狀態為true。              self._event.wait(wait_seconds)              # clear()方法將事件標誌狀態設置為false              self._event.clear()              wait_seconds = self._process_jobs()

_main_loop方法會構成主循環,其具體的執行邏輯在 _process_jobs方法中, _process_jobs方法部分代碼如下。

# apscheduler/schedulers/base.py/BaseScheduler    def _process_jobs(self):          """          Iterates through jobs in every jobstore, starts jobs that are due and figures out how long          to wait for the next round.            If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least          ``jobstore_retry_interval`` seconds.            """          if self.state == STATE_PAUSED:              self._logger.debug('Scheduler is paused -- not processing jobs')              return None            self._logger.debug('Looking for jobs to run')          now = datetime.now(self.timezone) # 當前時間          next_wakeup_time = None          events = []            with self._jobstores_lock:              # 從_jobstores中獲取當前要處理的任務              for jobstore_alias, jobstore in self._jobstores.items():                  try:                      # 以當前時間為基準,判斷是否到了執行時間                      due_jobs = jobstore.get_due_jobs(now)                  except Exception as e:                      # Schedule a wakeup at least in jobstore_retry_interval seconds                      # 在 jobstore 重試間隔時間(秒)內喚醒                      self._logger.warning('Error getting due jobs from job store %r: %s',                                           jobstore_alias, e)                      # 喚醒時間                      retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)                      if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:                          next_wakeup_time = retry_wakeup_time                        continue          # ... 省略部分代碼

_process_jobs方法通過 due_jobs=jobstore.get_due_jobs(now)獲取jobstore中的任務對象,通過前面的配置可知,mongodb是這裡的jobstore。

看到mongodb對應jobstore的代碼。

# apscheduler/jobstores/mongodb.py/MongoDBJobStore    def get_due_jobs(self, now):          timestamp = datetime_to_utc_timestamp(now)          return self._get_jobs({'next_run_time': {'$lte': timestamp}})

getduejobs方法主要調用 _get_jobs方法去獲取任務對象,要關注的重點是,它使用了lte以及時間戳作為參數,簡單用過mongodb的朋友都知道lte其實就是小於等於的意思,簡單而言,只要小於或等於timestamp這個時間戳的任務都會被獲取。

都看到這了,順便看一下 _get_jobs方法的代碼吧。

def _reconstitute_job(self, job_state):          # 反序列化,獲取任務對象參數          job_state = pickle.loads(job_state)          job = Job.__new__(Job)          job.__setstate__(job_state)          job._scheduler = self._scheduler          job._jobstore_alias = self._alias          return job        def _get_jobs(self, conditions):          jobs = []          failed_job_ids = []          for document in self.collection.find(conditions, ['_id', 'job_state'],                                               sort=[('next_run_time', ASCENDING)]):              try:                  jobs.append(self._reconstitute_job(document['job_state']))              except BaseException:                  self._logger.exception('Unable to restore job "%s" -- removing it',                                         document['_id'])                  failed_job_ids.append(document['_id'])            # Remove all the jobs we failed to restore          if failed_job_ids:              self.collection.remove({'_id': {'$in': failed_job_ids}})            return jobs # 返回所有小於等於某一時間戳的任務對象

到這裡就很清楚APScheduler會出現重複執行任務問題的原因。

啟動多個服務A,相當於運行同一份代碼多次,此時APSCheduler的配置都是相同的,即多個APScheduler實例連接同一個mongodb,此時mongodb中存在一個任務就有可能被APScheduler消費多次。

使用分佈式鎖

要解決APScheduler多實例重複執行任務的問題,最常見的解決方案就是使用分佈式鎖,而分佈式鎖中最常見的就是基於Redis構建的字段鎖。

Redis字段鎖很容易理解,就是通過set命令在redis中設置一個字段,如果字段存在,則是加鎖狀態,而字段不存在,則是解鎖狀態。

設計Redis鎖時,需要考慮操作原子性,避免同時去獲取Redis字段的情況出現,還需要考慮字段超時,避免因邏輯錯誤出現的長時間死鎖,所以設計Redis字段鎖還是需要一些tick的,這裡分享一種寫法,如下。

@contextmanager  def redis_lock(name, timeout=(24 + 2) * 60 * 60):      try:          today_string = datetime.datetime.now().strftime("%Y-%m-%d")          key = f"servername.lock.{name}.{today_string}"          log.info(f"<Redis Lock> {key}")          # 原子性的鎖: 不存在,創建鎖,返回1,相當於獲取鎖;存在,創建鎖失敗,返回0,相當於獲取鎖失敗;過一段時間超時,避免死鎖          # nx: 不存在,key值設置為value,返回1,存在,不操作,返回0          # ex: 設置超時          lock = bonus_redis.set(key, value=1, nx=True, ex=timeout)          yield lock      finally:          bonus_redis.delete(key) # 釋放鎖

通過上面方法設置的鎖與常用的鎖不同。

如果程序沒有獲得常用的鎖,則會阻塞等待鎖,而這裡涉及的鎖並不會等待,它的作用只是保證被鎖方法在特定時間段內只執行一次。

此外還要考慮的是加鎖位置,因為APScheduler會獲取小於某個時間戳下的所有任務,那為了避免任務被重複執行,最直觀的做法就是在任務函數中加上鎖,例子如下。

# 要被執行的任務函數  def tick():      with redislock() as lock:          if lock:              print('Tick! The time is: %s' % datetime.now())    if __name__ == '__main__':      scheduler = BackgroundScheduler()      # 添加到相應的jobstore中      scheduler.add_job(tick, 'interval', seconds=3) # 添加一個任務,3秒後運行      scheduler.start()      print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))

結尾

至此分佈式場景下使用APScheduler的方法就介紹完了,核心思想就是確保多個APScheduler實例對同一任務只會執行一次,感謝你的閱讀。

如果文章對你有所幫助,點擊「在看」支持二兩,叩謝豪恩。