系统官网网站模板下载,企业关键词排名优化哪家好,南通通州建设工程质量监督网站,wordpress post下载说明 最大的好处是方便。 其实所有任务的源头#xff0c;应该都是通过定时的方式#xff0c;在每个时隙发起轮询。当然在任务的后续传递中#xff0c;可以通过CallBack或者WebHook的方式#xff0c;以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。
总结…说明 最大的好处是方便。 其实所有任务的源头应该都是通过定时的方式在每个时隙发起轮询。当然在任务的后续传递中可以通过CallBack或者WebHook的方式以事件的形态进行。这样可以避免长任务执行的过程中进行等待和轮询。
总结一下源头是定时轮询中间过程是事件传递。
本次使用APS搭建本地定时任务的目的是为了简化实验性质的定时任务通过在git项目下进行编辑任务脚本和执行任务清单而运行容器本身会周期性的自动拉取代码然后按照任务清单执行。
执行过程采用多线程方式任务的负载通常都不高。整体设计上复杂和繁重的任务会包在微服务中定时任务主要是向这些微服务发起触发动作。通常微服务收到触发元信息后进行自动的任务/数据拉取处理处理完毕后通过webhook将结果持久化或进一步发起其他的触发动作。
另外具有共性的任务将会被提取出来之后会交给celery以分布协程方式执行这些任务包括
1 数据库IO。例如从队列里取数存到数据库中。2 网络数据获取IO。爬取网页、或者通过接口获取数据。3 接口化标准操作。按url, json input这样的标准web请求这种灵活性很强。表面上是一个IO动作但背后可能触发密集计算但是又不需要celery集群承担。可能是ray集群、dask集群、基于显卡计算的集群
内容
1 读取任务列表
主要为了简单的读入任务(脚本),同时可以方便的进行注释
# 用于将代表任务列表的数据读入
# 去掉换行和空格
# 如果以# 号开头表示注释
def read_all_lines_clean(fpath):with open(fpath, r) as f:lines f.readlines()lines1 [x.replace(\n,).strip() for x in lines]lines2 [x for x in lines1 if len(x) and not x.startswith(#)]return lines2任务文件如下task_list.txt
task_01_probably_git_pull.py
task_02_del_event_null_recs.py
# task_03_sync_xs_backup.py
#task_04_rotate_data.py
# task_05_sync_milvus.py
#task_06_rotate_mysql_time.py读入后
In [4]: a read_all_lines_clean(task_list.txt)In [5]: a
Out[5]: [task_01_probably_git_pull.py, task_02_del_event_null_recs.py]这些就是之后要定时调度的任务
2 并行执行
为了使得每一次定时任务都可以执行且保证效率需要用一些简单的调度容错问题均在脚本内解决。调度器可以保证每30秒起来一次。
线程的并行执行
def exe_tasks_threads(task_list_file base_config.task_list_file, project_folder base_config.project_folder):tasks read_all_lines_clean(project_folder task_list_file)dedup_tasks remove_duplicates_preserve_order(tasks)pytask_list [ {some_path:base_config.project_folderx} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_listpytask_list, max_workers 50)
每一次执行os_system_python
import subprocessdef os_system_python(some_pathNone, timeout30):try:result subprocess.run([python3, some_path], timeouttimeout)return resultexcept subprocess.TimeoutExpired:print(fTask {some_path} timed out after {timeout} seconds.)return None
代码说明
subprocess.run:这是 subprocess 模块的高级 API用于运行命令并等待其完成。它支持 timeout 参数如果命令在指定时间内未完成会抛出 TimeoutExpired 异常。timeout 参数:你设置了默认超时时间为 30 秒这是一个合理的默认值。如果任务在 30 秒内未完成subprocess.run 会抛出 TimeoutExpired 异常。异常处理:捕获 TimeoutExpired 异常后打印超时信息并返回 None。这样可以避免程序因超时而崩溃同时提供清晰的日志信息。3 自动更新
更新git项目作为一个任务脚本被周期执行。由于代码更新并不是高频事件所以一般概率上保证5分钟会更新一次代码。
(base) root76a14afa199b:/workspace/local_aps_v2/base# python3 task_01_probably_git_pull.py
2000-01-01 08:00:00
2000-01-01 08:00:00
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ HTTP/1.1 200 OK
task_01_probably_git_pull running
2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ HTTP/1.1 200 OK
Git pull executed successfully for branch master:
Already up to date.2025-01-29 19:47:19 - httpx - INFO - HTTP Request: POST http://192.168.0.4:24132/send_msg/ HTTP/1.1 200 OK
(base) root76a14afa199b:/workspace/local_aps_v2/base#4 定时调度
调度器在每分钟的0/30秒执行我把30秒定为一拍(pace),一分钟定位一时隙(slot)。绝大部分任务都应该在30秒内完成。
# 执行本地脚本
from datetime import datetime
import os
from apscheduler.schedulers.blocking import BlockingSchedulerfrom base_config import base_config
from Basefuncs import *
def exe_tasks_threads(task_list_file base_config.task_list_file, project_folder base_config.project_folder):tasks read_all_lines_clean(project_folder task_list_file)dedup_tasks remove_duplicates_preserve_order(tasks)pytask_list [ {some_path:base_config.project_folderx} for x in dedup_tasks]thread_concurrent_run(os_system_python, keyword_args_listpytask_list, max_workers 50)# 后台启动命令 nohup python3 /root/prj27_timetask/cron_task/test_001.py /dev/null 21 if __name__ __main__:# 创建调度器sche1 BlockingScheduler()# 添加任务使用 cron 表达式每分钟的第 0 秒和第 30 秒执行sche1.add_job(exe_tasks_threads,cron,second0,30, # 每分钟的第 0 秒和第 30 秒kwargs{},coalesceTrue,max_instances1)print([S] Starting scheduler with cron (0s and 30s of every minute)...)try:sche1.start() # 启动调度器except (KeyboardInterrupt, SystemExit):print([S] Scheduler stopped.)5 Docker运行
为了保证执行的稳定性使用docker执行
docker run -d --namelocal_aps_v2 \--restartalways \-v /etc/localtime:/etc/localtime -v /etc/timezone:/etc/timezone -v /etc/hostname:/etc/hostname -e LANGC.UTF-8 \-w /workspace/local_aps_v2/base \YOURIMAGE \sh -c git pull python3 aps.py只有环境改变时才需要修改镜像重发布大部分时候只要调试和修改代码然后推送就可以了。