5分鐘快速掌握Python定時任務(wù)框架的實現(xiàn)
在實際開發(fā)中我們經(jīng)常會碰上一些重復(fù)性或周期性的任務(wù),比如像每天定時爬取某個網(wǎng)站的數(shù)據(jù)、一定周期定時運(yùn)行代碼訓(xùn)練模型等,類似這類的任務(wù)通常需要我們手動來進(jìn)行設(shè)定或調(diào)度,以便其能夠在我們設(shè)定好的時間內(nèi)運(yùn)行。
在 Windows 上我們可以通過計劃任務(wù)來手動實現(xiàn),而在 Linux 系統(tǒng)上往往我們會用到更多關(guān)于 crontab 的相關(guān)操作。但手動管理并不是一個很好的選擇,如果我們需要有十幾個不同的定時任務(wù)需要管理,那么每次通過人工來進(jìn)行干預(yù)未免有些笨拙,那這時候就真的是「人工智能」了。
所以將這些定時任務(wù)的調(diào)度代碼化才是能夠讓我們很好地從這種手動管理的純?nèi)肆Σ僮髦薪饷摮鰜怼?/p>
在 Python 生態(tài)中對于定時任務(wù)的一些操作主要有那么幾個:
schedule:第三方模塊,該模塊適合比較輕量級的一些調(diào)度任務(wù),但卻不適用于復(fù)雜時間的調(diào)度 APScheduler:第三方定時任務(wù)框架,是對 Java 第三方定時任務(wù)框架 Quartz 的模仿與移植,能提供比 schedule 更復(fù)雜的應(yīng)用場景,并且各種組件都是模塊化,易于使用與二次開發(fā)。 Celery Beat:屬于 celery 這分布式任務(wù)隊列第三方庫下的一個定時任務(wù)組件,如果使用需要配合 RabbitMQ 或 Redis 這類的消息隊列套件,需要花費(fèi)一定的時間在環(huán)境搭建上,但在高版本中已經(jīng)不支持 Windows。所以為了滿足能夠相對復(fù)雜的時間條件,又不需要在前期的環(huán)境搭建上花費(fèi)很多時間的前提下,選擇 APScheduler 來對我們的調(diào)度任務(wù)或定時任務(wù)進(jìn)行管理是個性價比極高的選擇。而本文主要會帶你快速上手有關(guān) APScheduler 的使用。
APScheduler 概念與組件雖然說官方文檔上的內(nèi)容不是很多,而且所列舉的 API 不是很多,但這側(cè)面也反映了這一框架的簡單易用。所以在使用 APScheduler 之前,我們需要對這個框架的一些概念簡單了解,主要有那么以下幾個:
觸發(fā)器(trigger) 任務(wù)持久化(job stores) 執(zhí)行器(executor) 調(diào)度器(scheduler)觸發(fā)器(trigger)所謂的觸發(fā)器就是用以觸發(fā)定時任務(wù)的組件,在 APScheduler 中主要是指時間觸發(fā)器,并且主要有三類時間觸發(fā)器可供使用:
date:日期觸發(fā)器。日期觸發(fā)器主要是在某一日期時間點上運(yùn)行任務(wù)時調(diào)用,是 APScheduler 里面最簡單的一種觸發(fā)器。所以通常也適用于一次性的任務(wù)或作業(yè)調(diào)度。 interval:間隔觸發(fā)器。間隔觸發(fā)器是在日期觸發(fā)器基礎(chǔ)上擴(kuò)展了對時間部分,比如時、分、秒、天、周這幾個部分的設(shè)定。是我們用以對重復(fù)性任務(wù)進(jìn)行設(shè)定或調(diào)度的一個常用調(diào)度器。設(shè)定了時間部分之后,從起始日期開始(默認(rèn)是當(dāng)前)會按照設(shè)定的時間去執(zhí)行任務(wù)。 cron:cron 表達(dá)式觸發(fā)器。cron 表達(dá)式觸發(fā)器就等價于我們 Linux 上的 crontab,它主要用于更復(fù)雜的日期時間進(jìn)行設(shè)定。但需要注意的是,APScheduler 不支持 6 位及以上的 cron 表達(dá)式,最多只支持到 5 位。任務(wù)持久化(job stores)任務(wù)持久化主要是用于將設(shè)定好的調(diào)度任務(wù)進(jìn)行存儲,即便是程序因為意外情況,如斷電、電腦或服務(wù)器重啟時,只要重新運(yùn)行程序時,APScheduler 就會根據(jù)對存儲好的調(diào)度任務(wù)結(jié)果進(jìn)行判斷,如果出現(xiàn)已經(jīng)過期但未執(zhí)行的情況會進(jìn)行相應(yīng)的操作。
APScheduler 為我們提供了多種持久化任務(wù)的途徑,默認(rèn)是使用 memory 也就是內(nèi)存的形式,但內(nèi)存并不是持久化最好的方式。最好的方式則是通過像數(shù)據(jù)庫這樣的載體來將我們的定時任務(wù)寫入到磁盤當(dāng)中,只要磁盤沒有損壞就能將數(shù)據(jù)給恢復(fù)。
APScheduler 支持的且常用的數(shù)據(jù)庫主要有:
sqlalchemy 形式的數(shù)據(jù)庫,這里就主要是指各種傳統(tǒng)的關(guān)系型數(shù)據(jù)庫,如 MySQL、PostgreSQL、SQLite 等。 mongodb 非結(jié)構(gòu)化的 Mongodb 數(shù)據(jù)庫,該類型數(shù)據(jù)庫經(jīng)常用于對非結(jié)構(gòu)化或版結(jié)構(gòu)化數(shù)據(jù)的存儲或操作,如 JSON。 redis 內(nèi)存數(shù)據(jù)庫,通常用作數(shù)據(jù)緩存來使用,當(dāng)然通過一些主從復(fù)制等方式也能實現(xiàn)當(dāng)中數(shù)據(jù)的持久化或保存。通常我們可以在創(chuàng)建 Scheduler 實例時創(chuàng)建,或是單獨(dú)為任務(wù)指定。配置的方式相對簡單,我們只需要指定對應(yīng)的數(shù)據(jù)庫鏈接即可。
執(zhí)行器(executor)執(zhí)行器顧名思義就是執(zhí)行我們?nèi)蝿?wù)的對象,在計算機(jī)內(nèi)通常要么是 CPU 調(diào)度任務(wù),要么是單獨(dú)維護(hù)一個線程來運(yùn)行任務(wù)。所以 APScheduler 里的執(zhí)行器通常就是 ThreadPoolExecutor 或 ProcessPoolExecutor 這樣的線程池和進(jìn)程池兩種。
當(dāng)然如果是和協(xié)程或異步相關(guān)的任務(wù)調(diào)度,還可以使用對應(yīng)的 AsyncIOExecutor、TwistedExecutor 和 GeventExecutor 三種執(zhí)行器。
調(diào)度器(scheduler)調(diào)度器的選擇主要取決于你當(dāng)前的程序環(huán)境以及 APScheduler 的用途。根據(jù)用途的不同,APScheduler 又提供了以下幾種調(diào)度器:
BlockingScheduler:阻塞調(diào)度器,當(dāng)程序中沒有任何存在主進(jìn)程之中運(yùn)行東西時,就則使用該調(diào)度器。 BackgroundScheduler:后臺調(diào)度器,在不使用后面任何的調(diào)度器且希望在應(yīng)用程序內(nèi)部運(yùn)行時的后臺啟動時才進(jìn)行使用,如當(dāng)前你已經(jīng)開啟了一個 Django 或 Flask 服務(wù)。 AsyncIOScheduler:AsyncIO 調(diào)度器,如果代碼是通過 asyncio 模塊進(jìn)行異步操作,使用該調(diào)度器。 GeventScheduler:Gevent 調(diào)度器,如果代碼是通過 gevent 模塊進(jìn)行協(xié)程操作,使用該調(diào)度器 TornadoScheduler:Tornado 調(diào)度器,在 Tornado 框架中使用 TwistedScheduler:Twisted 調(diào)度器,在基于 Twisted 的框架或應(yīng)用程序中使用 QtScheduler:Qt 調(diào)度器,在構(gòu)建 Qt 應(yīng)用中進(jìn)行使用。通常情況下如果不是和 Web 項目或應(yīng)用集成共存,那么往往都首選 BlockingScheduler 調(diào)度器來進(jìn)行操作,它會在當(dāng)前進(jìn)程中啟動相應(yīng)的線程來進(jìn)行任務(wù)調(diào)度與處理;反之,如果是和 Web 項目或應(yīng)用共存,那么需要選擇 BackgroundScheduler 調(diào)度器,因為它不會干擾當(dāng)前應(yīng)用的線程或進(jìn)程狀況。
基于對以上的概念和組件認(rèn)識,我們就能基本上摸清 APScheduler 的運(yùn)行流程:
設(shè)定調(diào)度器(scheduler)用以對任務(wù)的調(diào)度與安排進(jìn)行全局統(tǒng)籌 對相應(yīng)的函數(shù)或方法上設(shè)定相應(yīng)的觸發(fā)器(trigger),并添加到調(diào)度器中 如有任務(wù)持久化(job stores)需要則需要設(shè)定對應(yīng)的持久化層,否則默認(rèn)使用內(nèi)存存儲任務(wù) 當(dāng)觸發(fā)器被觸發(fā)時,就將任務(wù)交由執(zhí)行器(executor)進(jìn)行執(zhí)行APScheduler 快速上手雖然 APScheduler 里面的概念和組件看起來有點多,但在使用上并不算很復(fù)雜,我們可以通過本節(jié)的示例就能夠很快使用。
選擇對應(yīng)的 scheduler
在使用之前我們需要先實例化一個 scheduler 對象,所有的 scheduler 對象都被放在了 apscheduler.schedulers 模塊下,我們可以直接通過查看 API 文檔或者借助 IDE 補(bǔ)全的提示來獲取相應(yīng)的 scheduler 對象。
這里我直接選取了最基礎(chǔ)的 BlockingScheduler:
# main.py from apscheduler.schedulers.blocking import BlockingScheduler scheduler = BlockingScheduler()
配置 scheduler
對于 scheduler 的一些配置我們可以直接在實例化對象時就進(jìn)行配置,當(dāng)然也可以在創(chuàng)建實例化對象之后再進(jìn)行配置。
實例化時進(jìn)行參數(shù)配置:
# main.pyfrom datetime import datetime from apscheduler.executors.pool import ThreadPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.blocking import BlockingScheduler # 任務(wù)持久化 使用 SQLitejobstores = { ’default’: SQLAlchemyJobStore(url = ’sqlite:///jobs.db’)}# 執(zhí)行器配置executors = { ’default’: ThreadPoolExecutor(20),}# 關(guān)于 Job 的相關(guān)配置,見官方文檔 APIjob_defaults = { ’coalesce’: False, ’next_run_time’: datetime.now()}scheduler = BlockingScheduler( jobstores = jobstores, executors = executors, job_defaults = job_defaults, timezone = ’Asia/Shanghai’)
或是通過 scheduler.configure 方法進(jìn)行同樣的操作:
scheduler = BlockingScheduler()scheduler.configure(jobstores=jobstores, executors=executors, job_defaults=job_defaults, timezone=’Asia/Shanghai’)添加并執(zhí)行你的任務(wù)
創(chuàng)建 scheduler 對象之后,我們需要調(diào)用其下的 add_job() 或是 scheduled_job() 方法來將我們需要執(zhí)行的函數(shù)進(jìn)行注冊。前者是以傳參的形式指定對應(yīng)的函數(shù)名,而后者則是以裝飾器的形式直接對我們要執(zhí)行的函數(shù)進(jìn)行修飾。
比如我現(xiàn)在有一個輸出此時此刻時間的函數(shù) now():
from datetime import datetime def now(trigger): print(f'trigger:{trigger} -> {datetime.now()}')
然后我打算每 5 秒的時候運(yùn)行一次,那我們使用 add_job() 可以這樣寫:
if __name__ == ’__main__’: scheduler.add_job(now, trigger = 'interval', args = ('interval',), seconds = 5) scheduler.start()
在調(diào)用 start() 方法之后調(diào)度器就會開始執(zhí)行,并在控制臺上看到對應(yīng)的結(jié)果了:
trigger:interval -> 2021-01-16 21:19:43.356674trigger:interval -> 2021-01-16 21:19:46.679849trigger:interval -> 2021-01-16 21:19:48.356595
當(dāng)然使用 @scheduled_job 的方式來裝飾我們的任務(wù)或許會更加自由一些,于是上面的例子就可以寫成這樣:
@scheduler.scheduled_job(trigger = 'interval', args = ('interval',), seconds = 5)def now(trigger): print(f'trigger:{trigger} -> {datetime.now()}') if __name__ == ’__main__’: scheduler.start()
運(yùn)行之后就會在控制臺看到同樣的結(jié)果了。
不過需要注意的是,添加任務(wù)一定要在 start() 方法執(zhí)行前調(diào)用,否則會找不到任務(wù)或是拋出異常。
將 APScheduler 集成到 Web 項目中如果你是正在做有關(guān)的 Web 項目且存在一些定時任務(wù),那么得益于 APScheduler 由于多樣的調(diào)度器,我們能夠?qū)⑵浜臀覀兊捻椖拷Y(jié)合到一起。
如果你正在使用 Flask,那么 Flask-APScheduler 這一別人寫好的第三方包裝庫就很適合你,雖然它沒有相關(guān)的文檔,但只要你了解了前面我所介紹的有關(guān)于 APScheduler 的概念和組件,你就能很輕易地看懂這個第三方庫倉庫里的示例代碼。
如果你使用的不是 Flask 框架,那么 APScheduler 本身也提供了一些對任務(wù)或作業(yè)的增刪改查操作,我們可以自己編寫一套合適的 API。
這里我使用的是 FastAPI 這一目前流行的 Web 框架。demo 項目結(jié)構(gòu)如下:
temp-scheduler├── config.py # 配置項├── main.py # API 文件└── scheduler.py # APScheduler 相關(guān)設(shè)置安裝依賴
這里我們需要的依賴不多,只需要簡單幾個即可:
pip install fastapi apscheduler sqlalchemy uvicorn配置項
如果項目中模塊過多,那么使用一個文件或模塊來進(jìn)行統(tǒng)一管理是最好的選擇。這里的 config.py 我們主要像 Flask 的配置那樣簡單設(shè)定:
from apscheduler.executors.pool import ThreadPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.schedulers.blocking import BlockingScheduler class SchedulerConfig: JOBSTORES = {'default': SQLAlchemyJobStore(url='sqlite:///job.db')} EXECUTORS = {'default': ThreadPoolExecutor(20)} JOB_DEFAULTS = {'coalesce': False} @classmethod def to_dict(cls): return { 'jobstores': cls.JOBSTORES, 'executors': cls.EXECUTORS, 'job_defaults': cls.JOB_DEFAULTS, }
在 SchedulerConfig 配置項中我們可以自己實現(xiàn)一個 to_dict() 類方法,以便我們后續(xù)傳參時通過解包的方式直接傳入配置參數(shù)即可。
Scheduler 相關(guān)設(shè)置scheduler.py 模塊的設(shè)定也比較簡單,即設(shè)定對應(yīng)的 scheduler 調(diào)度器即可。由于是演示 demo 我還將要定期執(zhí)行的任務(wù)也放在了這個模塊當(dāng)中:
import loggingfrom datetime import datetime from apscheduler.schedulers.background import BackgroundScheduler from config import SchedulerConfig scheduler = BackgroundScheduler()logger = logging.getLogger(__name__) def init_scheduler() -> None: # config scheduler scheduler.configure(**SchedulerConfig.to_dict()) logger.info('scheduler is running...') # schedule test scheduler.add_job( func=mytask, trigger='date', args=('APScheduler Initialize.',), next_run_time=datetime.now(), ) scheduler.start() def mytask(message: str) -> None: print(f'[{datetime.now()}] message: {message}')
在這一部分中:
init_scheduler() 方法主要用于在 API 服務(wù)啟動時被調(diào)用,然后對 scheduler 對象的配置以及測試 mytask() 則是我們要定期執(zhí)行的任務(wù),后續(xù)我們可以通過 APScheduler 提供的方法來自行添加任務(wù)API 設(shè)置在 main.py 模塊就主要存放著我們由 FastAPI 所構(gòu)建的相關(guān) API。如果在后續(xù)開發(fā)時存在多個接口,此時就需要將不同接口放在不同模塊文件中,以達(dá)到路由的分發(fā)與管理,類似于 Flask 的藍(lán)圖模式。
import loggingimport uuidfrom datetime import datetimefrom typing import Any, Dict, Optional, Sequence, Union from fastapi import FastAPIfrom pydantic import BaseModel from scheduler import init_scheduler, mytask, scheduler logger = logging.getLogger(__name__) app = FastAPI(title='APScheduler API')app.add_event_handler('startup', init_scheduler) class Job(BaseModel): id: Union[int, str, uuid.UUID] name: Optional[str] = None func: Optional[str] = None args: Optional[Sequence[Optional[str]]] = None kwargs: Optional[Dict[str, Any]] = None executor: Optional[str] = None misfire_grace_time: Optional[str] = None coalesce: Optional[bool] = None max_instances: Optional[int] = None next_run_time: Optional[Union[str, datetime]] = None @app.post('/add')def add_job( message: str, trigger: str, trigger_args: Optional[dict], id: Union[str, int, uuid.UUID],): try: scheduler.add_job( func=mytask, trigger=trigger, kwargs={'message': message}, id=id, **trigger_args, ) except Exception as e: logger.exception(e.args) return {'status_code': 0, 'message': '添加失敗'} return {'status_code': 1, 'message': '添加成功'} @app.delete('/delete/{id}')def delete_job(id: Union[str, int, uuid.UUID]): '''delete exist job by id''' try: scheduler.remove_job(job_id=id) except Exception: return dict( message='刪除失敗', status_code=0, ) return dict( message='刪除成功', status_code=1, ) @app.put('/reschedule/{id}')def reschedule_job( id: Union[str, int, uuid.UUID], trigger: str, trigger_args: Optional[dict]): try: scheduler.reschedule_job(job_id=id, trigger=trigger, **trigger_args) except Exception as e: logger.exception(e.args) return dict( message='修改失敗', status_code=0, ) return dict( message='修改成功', status_code=1, ) @app.get('/job')def get_all_jobs(): jobs = None try: job_list = scheduler.get_jobs() if job_list: jobs = [Job(**task.__getstate__()) for task in job_list] except Exception as e: logger.exception(e.args) return dict( message='查詢失敗', status_code=0, jobs=jobs, ) return dict( message='查詢成功', status_code=1, jobs=jobs, ) @app.get('/job/{id}')def get_job_by_id(id: Union[int, str, uuid.UUID]): jobs = [] try: job = scheduler.get_job(job_id=id) if job: jobs = [Job(**job.__getstate__())] except Exception as e: logger.exception(e.args) return dict( message='查詢失敗', status_code=0, jobs=jobs, ) return dict( message='查詢成功', status_code=1, jobs=jobs, )
以上代碼看起來很多,其實核心的就那么幾點:
FastAPI 對象 app 的初始化。這里用到的 add_event_handler() 方法就有點像 Flask 中的 before_first_request,會在 Web 服務(wù)請求伊始進(jìn)行操作,理解為初始化相關(guān)的操作即可。
API 接口路由。路由通過 app 對象下的對應(yīng) HTTP 方法來實現(xiàn),如 GET、POST、PUT 等。這里的裝飾器用法其實也和 Flask 很類似,就不多贅述。
scheduler 對象的增刪改查。從 scheduler.py 模塊中引入我們創(chuàng)建好的 scheduler 對象之后就可以直接用來做增刪改查的操作:
增:使用 add_job() 方法,其主要的參數(shù)是要運(yùn)行的函數(shù)(或方法)、觸發(fā)器以及觸發(fā)器參數(shù)等 刪:使用 delete_job() 方法,我們需要傳入一個對應(yīng)任務(wù)的 id 參數(shù),用以能夠查找到對應(yīng)的任務(wù) 改:使用 reschedule_job() 方法,這里也需要一個對應(yīng)任務(wù)的 id 參數(shù),以及需要重新修改的觸發(fā)器及其參數(shù) 查:使用 get_jobs() 和 get_job() 兩個方法,前者是直接獲取到當(dāng)前調(diào)度的所有任務(wù),返回的是一個包含了 APScheduler.job.Job 對象的列表,而后者是通過 id 參數(shù)來查找對應(yīng)的任務(wù)對象;這里我通過底層源碼使用 __getstate__() 來獲取到任務(wù)的相關(guān)信息,這些信息我們通過事先設(shè)定好的 Job 對象來對其進(jìn)行序列化,最后將信息從接口中返回。運(yùn)行完成以上的所有操作之后,我們就可以打開控制臺,進(jìn)入到該目錄下并激活我們的虛擬環(huán)境,之后運(yùn)行:
uvicorn main:app
之后我們就能在 FastAPI 默認(rèn)的地址 http://127.0.0.1:8000/docs 中看到關(guān)于全部接口的 Swagger 文檔頁面了:
fastapi 集成的 swagger 頁面
之后我們可以直接在文檔里面或使用 Postman 來自己進(jìn)行接口測試即可。
結(jié)尾本文介紹了有關(guān)于 APScheduler 框架的概念及其用法,并進(jìn)行了簡單的實踐。
得益于 APScheduler 的模塊化設(shè)計才可以讓我們更方便地去理解、使用它,并將其運(yùn)用到我們實際的開發(fā)過程中。
從 APScheduler 目前的 Github 倉庫代碼以及 issue 來看,作者已經(jīng)在開始重構(gòu) 4.0 版本,當(dāng)中的一些源代碼和 API 也有較大的變動,相信在 4.0 版本中將會引入更多的新特性。
但如果現(xiàn)階段你正打算使用或已經(jīng)使用 APScheduler 用于實際生產(chǎn)中,那么希望本文能對會你有所幫助。
到此這篇關(guān)于5分鐘快速掌握Python定時任務(wù)框架的實現(xiàn)的文章就介紹到這了,更多相關(guān)Python 定時任務(wù)內(nèi)容請搜索好吧啦網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持好吧啦網(wǎng)!
相關(guān)文章:
