分佈式場景下使用APScheduler
- 2020 年 2 月 19 日
- 筆記
簡介
APScheduler是一個定時任務框架,其主要功能就是方便控制不同類型的定時任務,本身並沒有考慮分佈式多實例情況下的一些問題,本篇文章就來簡單談談APScheduler在簡單分佈式場景下的使用。
分佈式帶來的問題
比如有個服務A,服務A中有使用APScheduler添加任務的邏輯,每次添加的任務會在隨後固定的某個時間點被APScheduler調用執行。
在單節點情況下,這並沒有什麼問題,但隨着業務加大,你可能要開啟多個服務A來做負載時,此時APScheduler就會出現重複執行任務的問題。
為了方便說明,這裡使用MongoDB作為APScheduler的jobstore,使用線程池作為它的執行器。(如果你不明白我在說啥,建議看看此前APScheduler的文章)
scheduler = BlockingScheduler( jobstores={"default": mongostore}, executors={"default": ThreadPoolExecutor(10)}, job_defaults={"coalesce": True, "max_instances": 3}, timezone='Asia/Shanghai', )
如果開啟了多個服務A,服務A中都使用了相同配置的scheduler,此時就會出現任務重複執行的問題。
為何會有這個問題?一起來閱讀一下相關源碼,一探究竟。
因為使用了BlockingScheduler作為調度器,所以直接看到該類的代碼
# apscheduler/schedulers/blocking.py class BlockingScheduler(BaseScheduler): """ A scheduler that runs in the foreground (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block). """ _event = None # ... 省略部分代碼 def _main_loop(self): wait_seconds = TIMEOUT_MAX while self.state != STATE_STOPPED: # 等待事件通知,wait_seconds為等待事件通知的超時時間 # wait()方法會阻塞線程,直到事件標誌狀態為true。 self._event.wait(wait_seconds) # clear()方法將事件標誌狀態設置為false self._event.clear() wait_seconds = self._process_jobs()
_main_loop
方法會構成主循環,其具體的執行邏輯在 _process_jobs
方法中, _process_jobs
方法部分代碼如下。
# apscheduler/schedulers/base.py/BaseScheduler def _process_jobs(self): """ Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for the next round. If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least ``jobstore_retry_interval`` seconds. """ if self.state == STATE_PAUSED: self._logger.debug('Scheduler is paused -- not processing jobs') return None self._logger.debug('Looking for jobs to run') now = datetime.now(self.timezone) # 當前時間 next_wakeup_time = None events = [] with self._jobstores_lock: # 從_jobstores中獲取當前要處理的任務 for jobstore_alias, jobstore in self._jobstores.items(): try: # 以當前時間為基準,判斷是否到了執行時間 due_jobs = jobstore.get_due_jobs(now) except Exception as e: # Schedule a wakeup at least in jobstore_retry_interval seconds # 在 jobstore 重試間隔時間(秒)內喚醒 self._logger.warning('Error getting due jobs from job store %r: %s', jobstore_alias, e) # 喚醒時間 retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: next_wakeup_time = retry_wakeup_time continue # ... 省略部分代碼
_process_jobs
方法通過 due_jobs=jobstore.get_due_jobs(now)
獲取jobstore中的任務對象,通過前面的配置可知,mongodb是這裡的jobstore。
看到mongodb對應jobstore的代碼。
# apscheduler/jobstores/mongodb.py/MongoDBJobStore def get_due_jobs(self, now): timestamp = datetime_to_utc_timestamp(now) return self._get_jobs({'next_run_time': {'$lte': timestamp}})
getduejobs方法主要調用 _get_jobs
方法去獲取任務對象,要關注的重點是,它使用了lte以及時間戳作為參數,簡單用過mongodb的朋友都知道lte其實就是小於等於的意思,簡單而言,只要小於或等於timestamp這個時間戳的任務都會被獲取。
都看到這了,順便看一下 _get_jobs
方法的代碼吧。
def _reconstitute_job(self, job_state): # 反序列化,獲取任務對象參數 job_state = pickle.loads(job_state) job = Job.__new__(Job) job.__setstate__(job_state) job._scheduler = self._scheduler job._jobstore_alias = self._alias return job def _get_jobs(self, conditions): jobs = [] failed_job_ids = [] for document in self.collection.find(conditions, ['_id', 'job_state'], sort=[('next_run_time', ASCENDING)]): try: jobs.append(self._reconstitute_job(document['job_state'])) except BaseException: self._logger.exception('Unable to restore job "%s" -- removing it', document['_id']) failed_job_ids.append(document['_id']) # Remove all the jobs we failed to restore if failed_job_ids: self.collection.remove({'_id': {'$in': failed_job_ids}}) return jobs # 返回所有小於等於某一時間戳的任務對象
到這裡就很清楚APScheduler會出現重複執行任務問題的原因。
啟動多個服務A,相當於運行同一份代碼多次,此時APSCheduler的配置都是相同的,即多個APScheduler實例連接同一個mongodb,此時mongodb中存在一個任務就有可能被APScheduler消費多次。
使用分佈式鎖
要解決APScheduler多實例重複執行任務的問題,最常見的解決方案就是使用分佈式鎖,而分佈式鎖中最常見的就是基於Redis構建的字段鎖。
Redis字段鎖很容易理解,就是通過set命令在redis中設置一個字段,如果字段存在,則是加鎖狀態,而字段不存在,則是解鎖狀態。
設計Redis鎖時,需要考慮操作原子性,避免同時去獲取Redis字段的情況出現,還需要考慮字段超時,避免因邏輯錯誤出現的長時間死鎖,所以設計Redis字段鎖還是需要一些tick的,這裡分享一種寫法,如下。
@contextmanager def redis_lock(name, timeout=(24 + 2) * 60 * 60): try: today_string = datetime.datetime.now().strftime("%Y-%m-%d") key = f"servername.lock.{name}.{today_string}" log.info(f"<Redis Lock> {key}") # 原子性的鎖: 不存在,創建鎖,返回1,相當於獲取鎖;存在,創建鎖失敗,返回0,相當於獲取鎖失敗;過一段時間超時,避免死鎖 # nx: 不存在,key值設置為value,返回1,存在,不操作,返回0 # ex: 設置超時 lock = bonus_redis.set(key, value=1, nx=True, ex=timeout) yield lock finally: bonus_redis.delete(key) # 釋放鎖
通過上面方法設置的鎖與常用的鎖不同。
如果程序沒有獲得常用的鎖,則會阻塞等待鎖,而這裡涉及的鎖並不會等待,它的作用只是保證被鎖方法在特定時間段內只執行一次。
此外還要考慮的是加鎖位置,因為APScheduler會獲取小於某個時間戳下的所有任務,那為了避免任務被重複執行,最直觀的做法就是在任務函數中加上鎖,例子如下。
# 要被執行的任務函數 def tick(): with redislock() as lock: if lock: print('Tick! The time is: %s' % datetime.now()) if __name__ == '__main__': scheduler = BackgroundScheduler() # 添加到相應的jobstore中 scheduler.add_job(tick, 'interval', seconds=3) # 添加一個任務,3秒後運行 scheduler.start() print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
結尾
至此分佈式場景下使用APScheduler的方法就介紹完了,核心思想就是確保多個APScheduler實例對同一任務只會執行一次,感謝你的閱讀。
如果文章對你有所幫助,點擊「在看」支持二兩,叩謝豪恩。