如何做网站企划案,工程建设教育网首页,软件开发人员外包,百度旗下所有app列表说明
之前已经完全跑通了任务调度#xff0c;实现了S2S的流转Python 全栈系列243 S2S flask_celery。由于request请求用起来比较别扭#xff0c;所以创建一个对象来进行便捷操作。
内容
1 功能
WFlaskAPS包含管理定时任务的必要功能
from datetime import datetime
from…说明
之前已经完全跑通了任务调度实现了S2S的流转Python 全栈系列243 S2S flask_celery。由于request请求用起来比较别扭所以创建一个对象来进行便捷操作。
内容
1 功能
WFlaskAPS包含管理定时任务的必要功能
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel
import requests as req class WFlaskAPS(BaseModel):flask_aps_agent:str IP:Port# 获取当前的任务列表def get_jobs(self):url http://%s/get_jobs % self.flask_aps_agentreturn req.get(url).json()# 删除某个任务def remove_a_task(self, task_name None):url http://%s/remove_a_task/ % self.flask_aps_agentdata_dict {}data_dict[task_id] task_namereturn req.post(url, jsondata_dict).json()# 获取任务状态def get_jobs_status(self):url http://%s/get_jobs_status/ % self.flask_aps_agentreturn req.get(url).json()# 发布一个任务(task)# task_name 其实是job type# task_id 任务的唯一编号# task_type 原来的服务只实现了cron方式因为cron方式可以实现其他两种方式。date在指定的日期和时间运行一次interval在指定时间间隔内运行cron使用Cron表达式运行def publish_a_task(self,task_id None ,task_name None, task_type cron, task_kwargs {},year None, month None, day None, week None, day_of_week None, hour None,minute None, second None, start_date None, end_date None,):url http://%s/publish_a_task/ % self.flask_aps_agentdata_dict {task_id:task_id,task_name:task_name,task_type:task_type,task_kwargs:task_kwargs,year:year,month:month,day:day,week:week,day_of_week:day_of_week,hour:hour,minute:minute,second:second,start_date:start_date,end_date:end_date}return req.post(url, jsondata_dict).json()def pause_a_task(self, task_id None):data_dict {}url http://%s/pause_a_task/ % self.flask_aps_agentdata_dict[task_id] task_idreturn req.post(url, jsondata_dict).json()def resume_a_task(self, task_id None):data_dict {}data_dict[task_id] task_idurl http://%s/resume_a_task/ % self.flask_aps_agentreturn req.post(url, jsondata_dict).json()# task_namedef add_a_job(self, fpath None, func_name None):url http://%s/add_task_type/ % self.flask_aps_agentdata {}data[func_name] func_namewith open(fpath, r) as f: data[func_body] f.read()return req.post(url, jsondata).json()从逻辑上首先需要创建任务类型(add_a_job),之后就可以依据这个job发布n个task发布任务时的参数是最复杂的。原始的flask apscheduler其实提供了三种触发类型
1 date 一次性触发2 interval 周期触发3 cron 触发
由于cron触发可以涵盖前两种的变化所以在之前的服务中只创建了cron格式的定时任务。与linux系统里的cron不同系统里最小的周期是分钟而这里是秒。
默认情况下周期任务是每秒执行的。如果需要改为n秒执行可以用 */5’的方式指定重复执行的周期。通过start_date和end_date可以钳制任务的周期跨度所以当然也包含了一次性的任务。
使用方法比直接请求接口简洁多了
from Basefuncs import *
# flask_aps_agent IP:PORT
wf WFlaskAPS()# 1 获取任务列表
wf.get_jobs()
# 2 删除一个任务
wf.remove_a_task(my_test1)
# 3 获取任务状态
wf.get_jobs_status()
# 4 发布一个任务
cur_dt_str get_time_str1()
wf.publish_a_task(task_id my_test1, task_name hello,start_datecur_dt_str,second */5)
# 5 暂停一个任务
wf.pause_a_task(task_id my_test1)
# 6 恢复任务
wf.resume_a_task(task_id my_test1)# 7 增加一个job(task_name)
# wf.add_a_job()2 Next
2.1 flask_aps_job_table
创建一张表里面存储了job的元信息表可以存在mymeta.flask_aps下面。
字段解释job_namejob 名称description描述para_dict参数样例
2.2 flask_aps_task_table
字段解释machine机器名, m1,m2task_id任务idjob_name任务类型set_to_status被设定的状态running_status当前状态start_dt开始时间end_dt结束时间interval_params周期时间
机器名是重要的因为同样的任务可能在不同的机器上执行。
2.3 assure_tasks.py
这个脚本伴随服务的启动会执行一次
1 根据本机名称去flask_aps_task_table寻找哪些被设定为运行的任务2 获取当前的任务列表根据任务名运行状态的差集执行 1 发布任务。这种情况一般是服务重启之后运行任务丢失了。
其他类型的操作可以后续通过前端来交互这里又会用一下MongoEngine).
2.4 S2S Work Mode
创建一个S2S然后让Worker根据这个来执行一些任务。