Python源碼學習Schedule
- 2019 年 10 月 3 日
- 筆記
關於我
一個有思想的程式猿,終身學習實踐者,目前在一個創業團隊任team lead,技術棧涉及Android、Python、Java和Go,這個也是我們團隊的主要技術棧。
Github:https://github.com/hylinux1024
微信公眾號:終身開發者(angrycode)
上一篇《一個簡單的Python調度器》介紹了一個簡單的Python
調度器的使用,後來我翻閱了一下它的源碼,驚奇的發現核心庫才一個文件,程式碼量短短700行不到。這是絕佳的學習材料。
讓我喜出望外的是這個庫的作者竟然就是我最近閱讀的一本書《Python Tricks》的作者!現在就讓我們看看大神的實現思路。
0x00 準備
項目地址
https://github.com/dbader/schedule
將程式碼checkout
到本地
環境
PyCharm+venv+Python3
0x01 用法
這個在上一篇也介紹過了,非常簡單
import schedule # 定義需要執行的方法 def job(): print("a simple scheduler in python.") # 設置調度的參數,這裡是每2秒執行一次 schedule.every(2).seconds.do(job) if __name__ == '__main__': while True: schedule.run_pending() # 執行結果 a simple scheduler in python. a simple scheduler in python. a simple scheduler in python. ...
這個庫的文檔也很詳細,可以瀏覽 https://schedule.readthedocs.io/ 了解庫的大概用法
0x02 項目結構
(venv) ➜ schedule git:(master) tree -L 2 . ... ├── requirements-dev.txt ├── schedule │ └── __init__.py ├── setup.py ├── test_schedule.py ├── tox.ini └── venv ├── bin ├── include ├── lib ├── pip-selfcheck.json └── pyvenv.cfg 8 directories, 18 files
schedule
目錄下就一個__init__.py
文件,這是我們需要重點學習的地方。setup.py
文件是發布項目的配置文件test_schedule.py
是單元測試文件,一開始除了看文檔外,也可以從單元測試中入手,了解這個庫的使用requirements-dev.txt
開發環境的依賴庫文件,如果核心的庫是不需要第三方的依賴的,但是單元測試需要venv
是我checkout
後創建的,原本的項目是沒有的
0x03 schedule
我們知道__init__.py
是定義Python
包必需的文件。在這個文件中定義方法、類都可以在使用import
命令時導入到工程項目中,然後使用。
schedule 源碼
以下是schedule
會用到的模組,都是Python
內部的模組。
import collections import datetime import functools import logging import random import re import time logger = logging.getLogger('schedule')
然後定義了一個日誌列印工具實例
接著是定義了該模組的3個異常類的結構體系,是由Exception
派生出來的,分別是ScheduleError
、ScheduleValueError
和IntervalError
class ScheduleError(Exception): """Base schedule exception""" pass class ScheduleValueError(ScheduleError): """Base schedule value error""" pass class IntervalError(ScheduleValueError): """An improper interval was used""" pass
還定義了一個CancelJob
的類,用於取消調度器的繼續執行
class CancelJob(object): """ Can be returned from a job to unschedule itself. """ pass
例如在自定義的需要被調度方法中返回這個CancelJob
類就可以實現一次性的任務
# 定義需要執行的方法 def job(): print("a simple scheduler in python.") # 返回CancelJob可以停止調度器的後續執行 return schedule.CancelJob
接著就是這個庫的兩個核心類Scheduler
和Job
。
class Scheduler(object): """ Objects instantiated by the :class:`Scheduler <Scheduler>` are factories to create jobs, keep record of scheduled jobs and handle their execution. """ class Job(object): """ A periodic job as used by :class:`Scheduler`. :param interval: A quantity of a certain time unit :param scheduler: The :class:`Scheduler <Scheduler>` instance that this job will register itself with once it has been fully configured in :meth:`Job.do()`. Every job runs at a given fixed time interval that is defined by: * a :meth:`time unit <Job.second>` * a quantity of `time units` defined by `interval` A job is usually created and returned by :meth:`Scheduler.every` method, which also defines its `interval`. """
Scheduler
是調度器的實現類,它負責調度任務(job
)的創建和執行。
Job
則是對需要執行任務的抽象。
這兩個類是這個庫的核心,後面我們還會看到詳細的分析。
接下來就是默認調度器default_scheduler
和任務列表jobs
的創建。
# The following methods are shortcuts for not having to # create a Scheduler instance: #: Default :class:`Scheduler <Scheduler>` object default_scheduler = Scheduler() #: Default :class:`Jobs <Job>` list jobs = default_scheduler.jobs # todo: should this be a copy, e.g. jobs()?
在執行import schedule
後,就默認創建了default_scheduler
。而Scheduler
的構造方法為
def __init__(self): self.jobs = []
在執行初始化時,調度器就創建了一個空的任務列表。
在文件的最後定義了一些鏈式調用的方法,使用起來也是非常人性化的,值得學習。
這裡的方法都定義在模組下,而且都是封裝了default_scheduler
實例的調用。
def every(interval=1): """Calls :meth:`every <Scheduler.every>` on the :data:`default scheduler instance <default_scheduler>`. """ return default_scheduler.every(interval) def run_pending(): """Calls :meth:`run_pending <Scheduler.run_pending>` on the :data:`default scheduler instance <default_scheduler>`. """ default_scheduler.run_pending() def run_all(delay_seconds=0): """Calls :meth:`run_all <Scheduler.run_all>` on the :data:`default scheduler instance <default_scheduler>`. """ default_scheduler.run_all(delay_seconds=delay_seconds) def clear(tag=None): """Calls :meth:`clear <Scheduler.clear>` on the :data:`default scheduler instance <default_scheduler>`. """ default_scheduler.clear(tag) def cancel_job(job): """Calls :meth:`cancel_job <Scheduler.cancel_job>` on the :data:`default scheduler instance <default_scheduler>`. """ default_scheduler.cancel_job(job) def next_run(): """Calls :meth:`next_run <Scheduler.next_run>` on the :data:`default scheduler instance <default_scheduler>`. """ return default_scheduler.next_run def idle_seconds(): """Calls :meth:`idle_seconds <Scheduler.idle_seconds>` on the :data:`default scheduler instance <default_scheduler>`. """ return default_scheduler.idle_seconds
我們看下入口方法run_pending()
,從本文一開頭的Demo
可以知道這個是啟動調度器的方法。這裡它執行了default_scheduler
中的方法。
default_scheduler.run_pending()
所以我們就把目光定位到Scheduler
類的相應方法
def run_pending(self): """ Run all jobs that are scheduled to run. Please note that it is *intended behavior that run_pending() does not run missed jobs*. For example, if you've registered a job that should run every minute and you only call run_pending() in one hour increments then your job won't be run 60 times in between but only once. """ runnable_jobs = (job for job in self.jobs if job.should_run) for job in sorted(runnable_jobs): self._run_job(job)
這個方法中首先從jobs
列表將需要執行的任務過濾後放在runnable_jobs
列表,然後將其排序後順序執行內部的_run_job(job)
方法
def _run_job(self, job): ret = job.run() if isinstance(ret, CancelJob) or ret is CancelJob: self.cancel_job(job)
在_run_job
方法中就調用了job
類中的run
方法,並根據返回值判斷是否需要取消任務。
這時候我們要看下Job
類的實現邏輯。
首先我們要看下Job
是什麼時候創建的。還是從Demo
中的程式碼入手
schedule.every(2).seconds.do(job)
這裡先執行了schedule.every()
方法
def every(interval=1): """Calls :meth:`every <Scheduler.every>` on the :data:`default scheduler instance <default_scheduler>`. """ return default_scheduler.every(interval)
這個方法就是scheduler
類中的every
方法
def every(self, interval=1): """ Schedule a new periodic job. :param interval: A quantity of a certain time unit :return: An unconfigured :class:`Job <Job>` """ job = Job(interval, self) return job
在這裡創建了一個任務job
,並將參數interval
和scheduler
實例傳入到構造方法中,最後返回job
實例用於實現鏈式調用。
跳轉到Job
的構造方法
def __init__(self, interval, scheduler=None): self.interval = interval # pause interval * unit between runs self.latest = None # upper limit to the interval self.job_func = None # the job job_func to run self.unit = None # time units, e.g. 'minutes', 'hours', ... self.at_time = None # optional time at which this job runs self.last_run = None # datetime of the last run self.next_run = None # datetime of the next run self.period = None # timedelta between runs, only valid for self.start_day = None # Specific day of the week to start on self.tags = set() # unique set of tags for the job self.scheduler = scheduler # scheduler to register with
主要初始化了間隔時間配置、需要執行的方法、調度器各種時間單位等。
執行every
方法之後又調用了seconds
這個屬性方法
@property def seconds(self): self.unit = 'seconds' return self
設置了時間單位,這個設置秒,當然還有其它類似的屬性方法minutes
、hours
、days
等等。
最後就是執行了do
方法
def do(self, job_func, *args, **kwargs): """ Specifies the job_func that should be called every time the job runs. Any additional arguments are passed on to job_func when the job runs. :param job_func: The function to be scheduled :return: The invoked job instance """ self.job_func = functools.partial(job_func, *args, **kwargs) try: functools.update_wrapper(self.job_func, job_func) except AttributeError: # job_funcs already wrapped by functools.partial won't have # __name__, __module__ or __doc__ and the update_wrapper() # call will fail. pass self._schedule_next_run() self.scheduler.jobs.append(self) return self
在這裡使用functools
工具的中的偏函數partial
將我們自定義的方法封裝成可調用的對象
然後就調用_schedule_next_run
方法,它主要是對時間的解析,按照時間對job
排序,我覺得這個方法是本項目中的技術點,邏輯也是稍微複雜一丟丟,仔細閱讀就可以看懂,主要是對時間datetime
的使用。由於篇幅,這裡就不再貼出程式碼。
這裡就完成了任務job
的添加。然後在調用run_pending
方法中就可以讓任務執行。
0x04 總結一下
schedule
庫定義兩個核心類Scheduler
和Job
。在導入包時就默認創建一個Scheduler
對象,並初始化任務列表。
schedule
模組提供了鏈式調用的介面,在配置schedule
參數時,就會創建任務對象job
,並會將job
添加到任務列表中,最後在執行run_pending
方法時,就會調用我們自定義的方法。
這個庫的核心思想是使用面向對象方法,對事物能夠準確地抽象,它總體的邏輯並不複雜,是學習源碼很不錯的範例。
0x05 學習資料
- https://github.com/dbader/schedule
- https://schedule.readthedocs.io