手机网页设计网站建设,wordpress中文怎么设置中文乱码,wordpress是什么意思,建筑工程发布网站系列文章目录
【Django开发入门】ORM的增删改查和批量操作 【Django功能开发】编写自定义manage命令 文章目录系列文章目录前言一、django定时任务二、django-apscheduler基本使用1.安装django-apscheduler2.配置settings.py的INSTALLED_APPS3.通过命令生成定时记录表3.如何创…系列文章目录
【Django开发入门】ORM的增删改查和批量操作 【Django功能开发】编写自定义manage命令 文章目录系列文章目录前言一、django定时任务二、django-apscheduler基本使用1.安装django-apscheduler2.配置settings.py的INSTALLED_APPS3.通过命令生成定时记录表3.如何创建自定义manage命令4.创建runapscheduler.py文件5.修改任务,上一个定时将被清空6.系统中启动和停止定时7.如何记录进程id8.对自定义manage命令添加参数控制定时任务的开启和停止二、django-apscheduler参数应用1.jobstore作业存储2.scheduler调度器3.executor执行器4.trigger触发器5.job任务管理第一种方式调度器条用add_job方法第二种方式装饰器执行6.监听定时任务总结前言
顾名思义定时任务是会在后台一直运行的但是在使用过程中我们还是会更新程序的尤其是在刚开始写定时任务时会频繁的启动定时程序但是这样可能有问题的每次启动程序都是打开了新的定时进程前边的定时程序可能没有关闭还在默默的帮你完成之前设定的工作需要看实际情况。 网络上虽然有很多教程其实使用方式都有些问题真的不敢直接用。这里整理了我自己在django框架中使用定时程序的一点心得如有需要可自取。 提示以下是本篇文章正文内容下面案例可供参考
一、django定时任务
在百度上随便一搜 django定时任务有很多库都实现了定时任务比如django-crontab、Celery、django-apschedulerapscheduler、time自定义等这些python库如果在windows系统还可以使用系统自带的定时任务linux系统自带的crontab任务。 上述这些都可以实现定时任务在开发中也可能都应用过本篇博客介绍的是在django框架下开发定时功能我这里推荐使用的是django-apscheduler适用于中小系统定时任务功能。
定时任务描述crontab适用于linux系统django-crontab适用于linux系统,基于crontabCelery适用于linux系统任务队列大型项目windows定时适用于windows系统time适用于多种系统django-apscheduler适用于多种系统中小型项目
二、django-apscheduler基本使用
1.安装django-apscheduler 代码如下示例 pip install django-apscheduler2.配置settings.py的INSTALLED_APPS
代码如下示例
INSTALLED_APPS (# ...django_apscheduler,
)3.通过命令生成定时记录表
使用python的manage命令我们应该cd到项目目录下就是manage.py文件所在的目录。
我们应该使用python manage.py makemigrations 和 python manage.py migrate同步数据库,数据库中将生成2个表django_apscheduler_djangojob和django_apscheduler_djangojobexecution。
3.如何创建自定义manage命令
自定义命令整理中
4.创建runapscheduler.py文件
runapscheduler.py正是通过自定义创建manange命令的py文件可以通过python manage.py runapscheduler 启动定时程序。
import loggingfrom django.conf import settingsfrom apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from django.core.management.base import BaseCommand
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_apscheduler import utillogger logging.getLogger(__name__)def my_job():# Your job processing logic here...print(123)passdef delete_old_job_executions(max_age604_800):此作业从数据库中删除早于“max_age”的APScheduler作业执行条目。它有助于防止数据库中塞满不再有用的旧历史记录。最长7天DjangoJobExecution.objects.delete_old_job_executions(max_age)class Command(BaseCommand):help Runs APScheduler.def handle(self, *args, **options):scheduler BlockingScheduler(timezonesettings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), default)scheduler.add_job(my_job,triggerCronTrigger(second*/10), # Every 10 secondsidmy_job, # The id assigned to each job MUST be uniquemax_instances1,replace_existingTrue,)logger.info(Added job my_job.)scheduler.add_job(delete_old_job_executions,triggerCronTrigger(day_of_weekmon, hour00, minute00), # Midnight on Monday, before start of the next work week.iddelete_old_job_executions,max_instances1,replace_existingTrue,)logger.info(Added weekly job: delete_old_job_executions.)try:logger.info(Starting scheduler...)scheduler.start()except KeyboardInterrupt:logger.info(Stopping scheduler...)scheduler.shutdown()logger.info(Scheduler shut down successfully!)1234这4步是django-apscheduler官网的使用步骤经过测试定时程序已经正常运行了。 现在,数据库中已经有了相关定时任务的记录。
5.修改任务,上一个定时将被清空
将步骤4中的runapscheduler文件的my_job改成my_job2再次python manage.py runapscheduler。 修改的位置
新的定时任务和数据库记录如下:
6.系统中启动和停止定时
在linux系统中我们希望定时任务后台运行需要执行python manage.py runapscheduler 。 其实通过python manage.py命令启动的程序每次都是开启了一个新的进程进程中开启一个线程去处理定时任务。这样启动的定时任务跟django的web服务是分开相互之间影像很低。很多博客将定时卸载views.py文件开始位置定时程序和web服务是在一个进程上会出现阻塞的情况。 因为某些原因我们想关闭定时程序在命令行输入ps -aux | grep python可以查看通过python运行的程序并且可以查看是哪一个py文件我的自定义命令文件是start_crontab.py这里就找start_crontab.py在通过kill 命令停止该进程。
ps -aux | grep python
# 下图的红框就是我们的启动命令蓝框是进程id列,725066就是我们定时任务进程id
kill 7250667.如何记录进程id
其实无论实在linux还是在windows我们是可以记录python manage.py runapscheduler任务的进程id的下边代码会生成一个crontab_pid.txt文件记录进程id方便下一次处理和操作。
import osfrom apscheduler.schedulers.blocking import BlockingScheduler
from django.core.management import BaseCommandfrom feishuapi.tasks import mission_6min
from tianyiapi.settings import BASE_DIRclass Command(BaseCommand):# 帮助文本, 一般备注命令的用途及如何使用。help 更新已有任务的状态# 处理命令行参数可选def add_arguments(self, parser):passdef kill(self, pid):# 本函数用于中止传入pid所对应的进程if os.name nt:# Windows系统cmd taskkill /pid str(pid) /ftry:os.system(cmd)print(pid, killed)except Exception as e:print(e)elif os.name posix:# Linux系统cmd kill str(pid)try:os.system(cmd)print(pid, killed)except Exception as e:print(e)else:print(Undefined os.name)# 核心业务逻辑def handle(self, *args, **options):# 1.读取上次保存的pidfile os.path.join(BASE_DIR, crontab_pid.txt)if os.path.isfile(file):with open(file, r) as f:pid f.read()# 2.如果存在杀死上一次的进程# print(上一次进程, pid)if pid:# 调用kill函数终止进程self.kill(pidpid)with open(file, w) as f:# 3.获取当前进程的pidpid os.getpid()# print(当前进程的pid: , pid)f.write(pid.__str__())try:# 创建调度器BlockingScheduler()scheduler BlockingScheduler()# 添加任务时间间隔为6分钟scheduler.add_job(mission_6min, interval, minutes6, idtest_job1)scheduler.start()except Exception as e:print(e)print(定时任务已启动)
8.对自定义manage命令添加参数控制定时任务的开启和停止
这个目前还没做有时间再补上。
二、django-apscheduler参数应用
django-apscheduler和apscheduler一样我们使用的过程中需要根据不同的场景调整定时任务。
1.jobstore作业存储 scheduler BlockingScheduler(timezonesettings.TIME_ZONE)scheduler.add_jobstore(DjangoJobStore(), default)2.scheduler调度器
BlockingScheduler : 当调度器是你应用中唯一要运行的东西时
BackgroundScheduler : 当你没有运行任何其他框架并希望调度器在你应用的后台执行时使用。
AsyncIOScheduler : 当你的程序使用了asyncio(一个异步框架)的时候使用。
GeventScheduler : 当你的程序使用了gevent(高性能的Python并发框架)的时候使用。
TornadoScheduler : 当你的程序基于Tornado(一个web框架)的时候使用。
TwistedScheduler : 当你的程序使用了Twisted(一个异步框架)的时候使用
QtScheduler : 如果你的应用是一个Qt应用的时候可以使用。
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.background import BackgroundSchedulerscheduler BackgroundScheduler()
scheduler.start() # 此处程序不会发生阻塞scheduler BlockingScheduler()
scheduler.start() # 此处程序会发生阻塞3.executor执行器
通常都是使用默认值特殊需求可以考虑。
# 方式1 线程
from apscheduler.executors.pool import ThreadPoolExecutor
executors {default: ThreadPoolExecutor(20) # 最多20个线程同时执行
}
scheduler BackgroundScheduler(executorsexecutors)# 方式2 进程
from apscheduler.executors.pool import ProcessPoolExecutor
executors {default: ProcessPoolExecutor(3) # 最多3个进程同时运行
}
scheduler BackgroundScheduler(executorsexecutors)
4.trigger触发器
APScheduler 有三种内建的 trigger:
date: 特定的时间点触发 interval: 固定时间间隔触发 cron: 在特定时间周期性地触发
from datetime import date# 在2020年11月11日00:00:00执行
sched.add_job(my_job, date, run_datedate(2020, 11, 11))# 在2020年11月1日16:30:05
sched.add_job(my_job, date, run_datedatetime(2020, 11, 11, 16, 30, 5))
sched.add_job(my_job, date, run_date2009-11-06 16:30:05)# 立即执行
sched.add_job(my_job, date)
sched.start()from datetime import datetime# 时间间隔可选seconds 、minutes、hours、days、weeks、start_date、end_date 、timezone
# 每两小时执行一次
sched.add_job(job_function, interval, hours2)# 在2010年10月10日09:30:00 到2014年6月15日的时间内每两小时执行一次
sched.add_job(job_function, interval, hours2, start_date2010-10-10 09:30:00, end_date2014-06-15 11:00:00)# 可选周期second、minute、hour、day、day_of_week、month、year、start_date、end_date、timezone
# 在6、7、8、11、12月的第三个周五的00:00, 01:00, 02:00和03:00 执行
sched.add_job(job_function, cron, month6-8,11-12, day3rd fri, hour0-3)# 在2014年5月30日前的周一到周五的5:30执行
sched.add_job(job_function, cron, day_of_weekmon-fri, hour5, minute30, end_date2014-05-30)5.job任务管理
通常添加job有2中方式。
第一种方式调度器条用add_job方法
def my_job():# Your job processing logic here...print(123)pass
job scheduler.add_job(my_job, interval, minutes2) # 添加任务第二种方式装饰器执行
scheduler BlockingScheduler(timezonesettings.TIME_ZONE)
scheduler.add_jobstore(DjangoJobStore(), default)
register_job(scheduler, interval, seconds1)
def dummy_job():print(Im a job!)
scheduler.start()涉及到具体定时任务可以通过id参数进行管理。
# 方式1 通过对象
job scheduler.add_job(myfunc, interval, minutes2) # 添加任务
job.remove() # 移除任务
job.pause() # 暂停任务
job.resume() # 恢复任务# 方式2 通过任务id
scheduler.add_job(myfunc, interval, minutes2, idmy_job_id) # 添加任务
scheduler.remove_job(my_job_id) # 移除任务
scheduler.pause_job(my_job_id) # 暂停任务
scheduler.resume_job(my_job_id) # 恢复任务scheduler.shutdown() # 停止任务
6.监听定时任务
# coding:utf-8from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR, EVENT_JOB_MISSED
import datetime
import logginglogger logging.getLogger(job)
logging.basicConfig(levellogging.INFO,format %(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s,datefmt %Y-%m-%d %H:%M:%S,filename mylog.txt,filemode a)任务1
def my_job(x):print(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)任务2
def test_job(x):print(datetime.datetime.now().strftime(%Y-%m-%d %H:%M:%S), x)print(1 / 0)def job_listener(Event):job scheduler.get_job(Event.job_id)if not Event.exception:print(任务正常运行)logger.info(jobname%s|jobtrigger%s|jobtime%s|retval%s, job.name, job.trigger,Event.scheduled_run_time, Event.retval)else:print(任务出错了)logger.error(jobname%s|jobtrigger%s|errcode%s|exception[%s]|traceback[%s]|scheduled_time%s, job.name,job.trigger, Event.code,Event.exception, Event.traceback, Event.scheduled_run_time)scheduler BlockingScheduler()
scheduler.add_job(functest_job, args(一次性任务,会出错,),next_run_timedatetime.datetime.now() datetime.timedelta(seconds15), iddate_task)
scheduler.add_job(funcmy_job, args(循环任务,), triggerinterval, seconds3, idinterval_task)
scheduler.add_listener(job_listener, EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_EXECUTED)
scheduler._logger logging
scheduler.start()总结
以上就是我对定时任务的理解在接下来的工作中可以更好的应用定时任务。