Django定时任务
目前有个需求,需要将定时任务迁移到web平台来,最开始是用的crontab,但不打算用了。期望的效果是只要启动了web server,那么定时任务就随之应用启动,因为有些功能是与web server有关,所以不打算依赖于系统,在将来做迁移时也方便一些,只要启动server那么定时任务就有了。
有2个选项,一个是apscheduler,另一个是celery。
最开始倒是想采用celery的,功能丰富,但就是对于目前的需求来说太重了,还需要配置redis。
于是采取了django-apscheduler。简单一些,比较直接。
首先安装包
pip install django-apscheduler
在settings.py中添加app
INSTALLED_APPS = (
# ...
"django_apscheduler",
)
运行一下migrate
python manage.py migrate
好了,可以用了。
为了测试一下,先弄一个简单的定时任务。
在任意一个app下增加一个文件就叫做task.py:
from datetime import datetime
import time
if __name__ == "__main__":
for i in range(10):
print(f"start: {datetime.now()}")
time.sleep(5)
print(f"end: {datetime.now()}")
在对应的app目录下新增目录,层次结构如下:
my_app
- management
__init__.py
- commands
__init__.py
start_task.py
- apps.py
- views.py
...
编辑start_task.py,这里用subprocess启动新进程,并使用backgroundscheduler后台启动,不阻塞任务;这里有一个调用文件的定时任务,还有一个删除记录的定时任务:
import logging
import subprocess
from datetime import datetime
from django.conf import settings
# from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import util
logger = logging.getLogger(__name__)
def sample():
subprocess.Popen("/path/env/bin/python /path/my_tasks.py > /path/start.log", shell=True)
print(f"done: {datetime.now()}")
@util.close_old_connections
def delete_old_job_executions(max_age=15552000):
"""
15552000 = 86400*180, 180 days
"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)
class Command(BaseCommand):
def handle(self, *args, **options):
# scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
scheduler = BackgroundScheduler(timezone=settings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), "default")
scheduler.add_job(
sample,
trigger=CronTrigger(minute="*/2"), # Every 2 minutes
id="sample",
max_instances=1,
replace_existing=True,
)
logger.info("Added job 'sample'.")
scheduler.add_job(
delete_old_job_executions,
trigger=CronTrigger(
day_of_week="mon", hour="00", minute="00"
), # Midnight on Monday, before start of the next work week.
id="delete_old_job_executions",
max_instances=1,
replace_existing=True,
)
logger.info("Added weekly job: 'delete_old_job_executions'.")
try:
logger.info("Starting scheduler...")
scheduler.start()
except KeyboardInterrupt:
logger.info("Stopping scheduler...")
scheduler.shutdown()
logger.info("Scheduler shut down successfully!")
此时一个命令行窗口中运行python manage.py runserver启动server,再开一个命令行窗口运行python manage.py start_task启动定时任务,就能看到对应输出。其实此时不启动server也可以,启动server只是为了在admin里看到记录。
当然也可以通过runserver就把定时任务带起来,不用单独再启动命令,需要做少许改动。
在当前app下找到apps.py,增加一个ready方法,比如:
from django.apps import AppConfig
class TasksConfig(AppConfig):
name = 'tasks'
def ready(self):
from tasks.management.commands import start_task
start_task.Command().handle()
并在settings里新增一个记录:
INSTALLED_APPS = [
......
'tasks.apps.TasksConfig',
]
此时运行python manage.py runserver就能看到对应的任务定时在运行,admin也能看到记录。
不过由于settings会重复加载,于是会造成看到到时间点启动了2个及以上的同样的任务在运行,虽然在设置里有replace_existing参数,不会在记录中产生重复数据,但是对应的进程却仍然保留在2个及以上。这是个不好的现象,因为如果是计算量很大的进程,在这里会消耗很多资源容易卡死,即便最后的数据只有一份,但进程仍然产生了重复。
这种情况可以指定--noreload来避免,运行python manage.py runserver --noreload,此时就定时任务只有一个进程了。
但是一旦部署到服务器上,比如我现在用的是gunicorn+nginx,并且worker或threads是大于1的,从日志来看仍然产生了多余的进程,gunicorn的运行不支持--noreload,也试过--preload,但是产生了DB相关的报错。
很可惜,如果能直接gunicorn启动时就带上定时任务多好。。
现在就打算systemd中启动server,然后增加一个systemd用起启动定时任务命令,都是指向django。gunicorn的报错等之后再看看,估计重复进程的问题无法解决,在github上django-apscheduler的issues里看到一个帖子,作者倾向于单独命令启动任务。
[Question] How to run apscheduler while using gunicorn by sysmtemd?
此时删除掉'tasks.apps.TasksConfig', 由于用了backgroundscheduler,一启动就会退出,可以在handle方法最后加上一个while True: time.sleep(1), 这个只是单独运行python manage.py start_task时用到。