网站优化心得,长沙seo在哪,晋中市科技馆网站建设,wordpress php推送示例在现代应用开发中#xff0c;异步任务处理是一个常见的需求。无论是数据处理、图像生成#xff0c;还是复杂的计算任务#xff0c;异步执行都能显著提升系统的响应速度和吞吐量。今天#xff0c;我们将通过一个实际项目#xff0c;探索如何使用 FastAPI、Celery 和 Redis …在现代应用开发中异步任务处理是一个常见的需求。无论是数据处理、图像生成还是复杂的计算任务异步执行都能显著提升系统的响应速度和吞吐量。今天我们将通过一个实际项目探索如何使用 FastAPI、Celery 和 Redis 构建一个高性能的异步任务引擎。
项目背景
技术栈介绍
FastAPI一个现代、高性能的 Web 框架基于 Python 3.7 的异步编程特性构建。它支持自动生成 OpenAPI 文档和 Swagger UI能够快速构建 RESTful API并且具有极低的延迟和高并发处理能力。Celery一个分布式任务队列系统主要用于处理异步任务和定时任务。它支持多种消息传输机制能够将任务分发到多个工作节点上并行处理从而提高系统的吞吐量和响应速度。Redis一个高性能的键值存储系统常用于缓存、消息队列和分布式锁等场景。在 Celery 中Redis 通常作为消息代理Broker和结果存储Backend负责任务的分发和结果的持久化。
项目目标
通过 FastAPI、Celery 和 Redis 的结合构建一个能够高效处理用户提交的 Python 代码的异步任务引擎。用户可以通过 API 提交代码系统会异步执行代码并返回任务的执行结果。
项目目录结构
project/
├── main.py
├── utils.py
├── schemas.py
└── app/├── __init__.py├── config.py└── tasks/├── __init__.py└── tasks.py代码功能深度解析
1. main.pyFastAPI 应用的核心
main.py 是项目的核心入口文件负责定义 FastAPI 应用的接口和逻辑。
FastAPI 应用初始化
app FastAPI(titleAsync Task API, description, version1.0.0)这里我们创建了一个 FastAPI 应用命名为 Async Task API版本为 1.0.0。
自定义 Swagger UI
def swagger_monkey_patch(*args, **kwargs):return get_swagger_ui_html(*args,**kwargs,swagger_js_urlhttps://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui-bundle.js,swagger_css_urlhttps://cdn.bootcdn.net/ajax/libs/swagger-ui/5.6.2/swagger-ui.min.css,)
applications.get_swagger_ui_html swagger_monkey_patch通过 Monkey Patch 的方式我们自定义了 Swagger UI 的资源加载路径使用了国内的 CDN 加速资源提升文档加载速度。
全局异常处理
app.exception_handler(Exception)
def validation_exception_handler(request, err):base_error_message fFailed to execute: {request.method}: {request.url}return JSONResponse(status_code400, content{message: f{base_error_message}. Detail: {err}})我们定义了一个全局异常处理器捕获所有未处理的异常并返回一个包含错误信息的 JSON 响应。
HTTP 中间件计算请求处理时间
app.middleware(http)
async def add_process_time_header(request, call_next):start_time time.time()response await call_next(request)process_time time.time() - start_timeresponse.headers[X-Process-Time] str(f{process_time:0.4f} sec)return response这个中间件用于计算每个请求的处理时间并将处理时间添加到响应头 X-Process-Time 中方便调试和性能优化。
创建任务的 API
app.post(/tasks)
def create_pytask(task: schemas.PyTask):code task.codetime_limit task.time_limitexpires task.expiresresult execute_python_code.apply_async(args(code,), time_limittime_limit, expiresexpires)return JSONResponse(content{task_id: result.id})用户可以通过 /tasks 接口提交 Python 代码代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。
查询任务结果的 API
app.get(/tasks/{task_id}, response_modelschemas.PyTaskResult)
def get_task_result(task_id: str):return get_task_info(task_id)用户可以通过 /tasks/{task_id} 接口查询任务的执行结果和状态。
2. utils.py任务信息获取工具
utils.py 文件定义了一个工具函数 get_task_info用于获取 Celery 任务的状态和结果。
def get_task_info(task_id):task_result AsyncResult(task_id, appapp)result {task_id: task_id,task_status: task_result.status,task_result: task_result.result}return result通过 AsyncResult我们可以获取任务的当前状态如 PENDING、SUCCESS、FAILURE 等和执行结果。
3. schemas.py数据模型定义
schemas.py 文件定义了 Pydantic 模型用于验证和序列化请求和响应的数据。
任务请求模型
class PyTask(BaseModel):code: strexpires: Optional[int] Nonetime_limit: Optional[int] None用户提交的任务请求包含以下字段
code: 任务的 Python 代码。expires: 任务的过期时间可选。time_limit: 任务的时间限制可选。
任务结果模型
class PyProgramResult(BaseModel):status: stroutput: Optional[str] Noneerror: Optional[str] None任务的执行结果包含以下字段
status: 任务的执行状态如 success 或 failure。output: 任务的标准输出可选。error: 任务的错误输出可选。
任务结果响应模型
class PyTaskResult(BaseModel):task_id: strtask_status: strtask_result: Optional[PyProgramResult] None任务的查询结果包含以下字段
task_id: 任务的 ID。task_status: 任务的状态如 PENDING、SUCCESS 等。task_result: 任务的执行结果可选。
4. app/__init__.pyCelery 应用初始化
app/__init__.py 文件是 Celery 应用的初始化文件主要用于配置 Celery 应用和任务的自动发现。
创建 Celery 应用
app Celery(my_celery_project)我们创建了一个名为 my_celery_project 的 Celery 应用。
加载配置
app.config_from_object(app.config)从 app.config 文件中加载 Celery 的配置。
自动发现任务
app.autodiscover_tasks([app.tasks])自动发现 app.tasks 模块中的任务。
Worker 和 Beat 初始化
worker_init.connect
def worker_initialization(**kwargs):print(Worker 初始化开始)beat_init.connect
def beat_initialization(**kwargs):print(Beat 初始化开始)定义了 Worker 和 Beat 的初始化函数分别在 Worker 和 Beat 启动时执行。
5. app/config.pyCelery 配置
app/config.py 文件定义了 Celery 的配置。
消息代理和结果存储
broker_url redis://:redisisthebestredis:6379/0
result_backend redis://:redisisthebestredis:6379/0使用 Redis 作为消息代理和结果存储。
任务结果过期时间
result_expires 3600任务结果在 Redis 中保存 1 小时后过期。
序列化配置
task_serializer json
result_serializer json
accept_content [json]使用 JSON 作为任务和结果的序列化格式。
时区配置
timezone Asia/Shanghai
enable_utc True设置时区为 Asia/Shanghai并启用 UTC 时间。
6. app/tasks/tasks.py任务执行逻辑
app/tasks/tasks.py 文件定义了一个 Celery 任务 execute_python_code用于执行用户提交的 Python 代码。
app.task
def execute_python_code(code_string):temp_file temp_code.pywith open(temp_file, w) as f:f.write(code_string)try:result subprocess.run([python3, temp_file],stdoutsubprocess.PIPE,stderrsubprocess.PIPE,textTrue)if result.stderr:return {status: failure, error: result.stderr}else:return {status: success, output: result.stdout}finally:if os.path.exists(temp_file):os.remove(temp_file)该任务将用户提交的代码字符串保存为临时文件然后使用 subprocess.run 执行该文件捕获标准输出和错误输出。如果执行成功返回 success 状态和标准输出如果执行失败返回 failure 状态和错误输出。最后删除临时文件。
部署分析
version: 3.8services:fastapi:image: lab:python-packagescontainer_name: fastapiports:- 8080:8080volumes:- D:\dockerMount\code\celery:/home/codeworking_dir: /home/codecommand: python3 main.pyrestart: unless-stoppednetworks:- mynetcelery-worker:image: lab:python-packagescontainer_name: celery-workervolumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app worker --concurrency4 --loglevelinforestart: unless-stoppednetworks:- mynetcelery-flower:image: lab:python-packagescontainer_name: celery-flowerports:- 5555:5555volumes:- D:\dockerMount\code\celery:/home/celeryworking_dir: /home/celerycommand: celery -A app flower --port5555restart: unless-stoppednetworks:- mynetredis:image: bitnami/redis:7.2.4-debian-12-r16container_name: redisenvironment:- REDIS_PASSWORDredisisthebestnetworks:- mynetnetworks:mynet:external: false在这个 Docker Compose 配置中我们定义了三个服务
fastapiFastAPI 应用负责接收用户请求并分发任务。celery-workerCelery 工作节点负责执行异步任务。celery-flowerCelery 的监控工具提供任务执行的可视化界面。redisRedis 服务作为 Celery 的消息代理和结果存储。
代码的功能和价值
功能 异步任务执行 用户可以通过 /tasks 接口提交 Python 代码代码会被异步执行。任务的执行结果可以通过 /tasks/{task_id} 接口查询。 任务状态管理 任务的状态如 PENDING、SUCCESS、FAILURE 等可以通过 /tasks/{task_id} 接口查询。 高性能和可扩展性 使用 FastAPI 和 Celery 构建的异步任务引擎能够处理高并发的任务请求。Celery 的分布式特性使得系统可以轻松扩展以应对更多的任务。 安全性 通过设置 time_limit 和 expires可以限制任务的执行时间和过期时间防止恶意代码的长时间执行。 易用性 FastAPI 自动生成的 Swagger UI 使得 API 的使用和调试更加方便。Pydantic 模型确保了请求和响应数据的类型安全。
价值 高效的任务处理 该系统能够高效地处理大量异步任务适用于需要异步执行代码的场景如在线代码执行、数据处理、图像处理等。 可扩展性 通过 Celery 的分布式任务队列系统可以轻松扩展以处理更多的任务适合高并发场景。 安全性 通过限制任务的执行时间和过期时间系统能够有效防止恶意代码的滥用。 易用性 FastAPI 和 Pydantic 的结合使得 API 的开发和维护更加简单同时提供了自动生成的文档和类型检查。 灵活性 系统支持自定义任务的执行逻辑可以根据业务需求扩展任务类型和功能。
总结
通过 FastAPI、Celery 和 Redis 的结合我们构建了一个高性能、可扩展的分布式异步任务引擎。它能够高效地处理用户提交的 Python 代码并提供任务状态查询功能。该系统适用于需要异步执行代码的场景具有高效、安全、易用和灵活的特点。
无论是构建一个在线代码执行平台还是处理复杂的计算任务这个项目都为你提供了一个强大的基础。希望这篇文章能为你带来启发让你在异步任务处理的道路上走得更远
附图
发送任务 查询结果