网站内部关键词,人才网站建设策划书,电商模板免费下载,wordpress响应式主题让我用一个数据分析项目的例子来展示plan-and-execute框架的应用。这个例子会涉及数据处理、分析和可视化等任务。
from typing import List, Dict, Any
from dataclasses import dataclass
import json
from enum import Enum
import logging
from datetime import datetime#…让我用一个数据分析项目的例子来展示plan-and-execute框架的应用。这个例子会涉及数据处理、分析和可视化等任务。
from typing import List, Dict, Any
from dataclasses import dataclass
import json
from enum import Enum
import logging
from datetime import datetime# 任务状态枚举
class TaskStatus(Enum):PENDING pendingRUNNING runningCOMPLETED completedFAILED failed# 任务优先级枚举
class TaskPriority(Enum):LOW 1MEDIUM 2HIGH 3# 任务定义
dataclass
class Task:id: strname: strdescription: strpriority: TaskPrioritydependencies: List[str] # 依赖的任务ID列表status: TaskStatusresult: Any Noneerror: str None# 工作流执行器
class WorkflowExecutor:def __init__(self):self.tasks {}self.logger logging.getLogger(__name__)def add_task(self, task: Task):self.tasks[task.id] taskdef get_ready_tasks(self) - List[Task]:获取所有依赖已满足的待执行任务ready_tasks []for task in self.tasks.values():if task.status TaskStatus.PENDING:dependencies_met all(self.tasks[dep_id].status TaskStatus.COMPLETEDfor dep_id in task.dependencies)if dependencies_met:ready_tasks.append(task)return sorted(ready_tasks, keylambda x: x.priority.value, reverseTrue)def execute_task(self, task: Task):执行单个任务task.status TaskStatus.RUNNINGtry:# 这里实现具体任务的执行逻辑result self.task_handlers[task.id](task, {dep: self.tasks[dep].result for dep in task.dependencies})task.result resulttask.status TaskStatus.COMPLETEDexcept Exception as e:task.status TaskStatus.FAILEDtask.error str(e)self.logger.error(fTask {task.id} failed: {e})def execute_workflow(self):执行整个工作流while True:ready_tasks self.get_ready_tasks()if not ready_tasks:breakfor task in ready_tasks:self.execute_task(task)# 检查是否所有任务都完成all_completed all(task.status TaskStatus.COMPLETED for task in self.tasks.values())return all_completed# 数据分析工作流示例
class DataAnalysisWorkflow:def __init__(self, data_path: str, output_path: str):self.data_path data_pathself.output_path output_pathself.executor WorkflowExecutor()def plan_workflow(self):规划工作流程tasks [Task(idload_data,name加载数据,description从CSV文件加载数据,priorityTaskPriority.HIGH,dependencies[],statusTaskStatus.PENDING),Task(idclean_data,name数据清洗,description处理缺失值和异常值,priorityTaskPriority.HIGH,dependencies[load_data],statusTaskStatus.PENDING),Task(idfeature_engineering,name特征工程,description创建新特征,priorityTaskPriority.MEDIUM,dependencies[clean_data],statusTaskStatus.PENDING),Task(idstatistical_analysis,name统计分析,description计算基本统计指标,priorityTaskPriority.MEDIUM,dependencies[clean_data],statusTaskStatus.PENDING),Task(idvisualization,name数据可视化,description生成图表,priorityTaskPriority.MEDIUM,dependencies[statistical_analysis],statusTaskStatus.PENDING),Task(idgenerate_report,name生成报告,description生成分析报告,priorityTaskPriority.LOW,dependencies[visualization, feature_engineering],statusTaskStatus.PENDING)]for task in tasks:self.executor.add_task(task)def register_task_handlers(self):注册任务处理函数self.executor.task_handlers {load_data: self.load_data,clean_data: self.clean_data,feature_engineering: self.feature_engineering,statistical_analysis: self.statistical_analysis,visualization: self.visualization,generate_report: self.generate_report}def load_data(self, task: Task, dependencies: Dict):import pandas as pddf pd.read_csv(self.data_path)return dfdef clean_data(self, task: Task, dependencies: Dict):df dependencies[load_data]# 处理缺失值df df.fillna(df.mean())# 处理异常值# ... 其他清洗逻辑return dfdef feature_engineering(self, task: Task, dependencies: Dict):df dependencies[clean_data]# 创建新特征# ... 特征工程逻辑return dfdef statistical_analysis(self, task: Task, dependencies: Dict):df dependencies[clean_data]stats {basic_stats: df.describe(),correlations: df.corr(),# ... 其他统计分析}return statsdef visualization(self, task: Task, dependencies: Dict):import matplotlib.pyplot as pltstats dependencies[statistical_analysis]figures []# 生成可视化# ... 可视化逻辑return figuresdef generate_report(self, task: Task, dependencies: Dict):figures dependencies[visualization]df_features dependencies[feature_engineering]report {timestamp: datetime.now().isoformat(),statistics: str(dependencies[statistical_analysis]),features: df_features.columns.tolist(),figures: [f.to_json() for f in figures]}# 保存报告with open(f{self.output_path}/report.json, w) as f:json.dump(report, f, indent2)return reportdef run(self):运行完整的工作流self.plan_workflow()self.register_task_handlers()success self.executor.execute_workflow()if success:final_report self.executor.tasks[generate_report].resultprint(工作流执行成功!)return final_reportelse:failed_tasks [task for task in self.executor.tasks.values()if task.status TaskStatus.FAILED]print(工作流执行失败。失败的任务:)for task in failed_tasks:print(f- {task.name}: {task.error})return None# 使用示例
def main():workflow DataAnalysisWorkflow(data_pathdata/sales_data.csv,output_pathoutput)result workflow.run()if result:print(分析报告已生成:, result)else:print(工作流执行失败)if __name__ __main__:main()这个例子展示了
工作流框架的核心组件
Task定义工作流执行器依赖管理状态追踪错误处理
实现的关键特性
自动任务规划依赖关系处理并行任务执行结果传递错误恢复
可以扩展的方向
# 1. 添加任务重试机制
class RetryableExecutor(WorkflowExecutor):def execute_task(self, task: Task, max_retries: int 3):retries 0while retries max_retries:try:super().execute_task(task)if task.status TaskStatus.COMPLETED:breakexcept Exception as e:retries 1self.logger.warning(fRetry {retries}/{max_retries} for task {task.id})# 2. 添加进度监控
class MonitoredWorkflow(DataAnalysisWorkflow):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.progress_callback Nonedef set_progress_callback(self, callback):self.progress_callback callbackdef update_progress(self, task: Task, status: str):if self.progress_callback:self.progress_callback(task, status)# 3. 添加中间结果缓存
class CachedExecutor(WorkflowExecutor):def __init__(self, cache_dir: str):super().__init__()self.cache_dir cache_dirdef get_cached_result(self, task: Task):cache_path f{self.cache_dir}/{task.id}.cacheif os.path.exists(cache_path):return pickle.load(open(cache_path, rb))return Nonedef cache_result(self, task: Task):cache_path f{self.cache_dir}/{task.id}.cachepickle.dump(task.result, open(cache_path, wb))使用建议
# 1. 配置日志
logging.basicConfig(levellogging.INFO,format%(asctime)s - %(name)s - %(levelname)s - %(message)s
)# 2. 添加性能监控
from time import timeclass PerformanceMonitor:def __init__(self):self.task_times {}def start_task(self, task_id: str):self.task_times[task_id] {start: time()}def end_task(self, task_id: str):self.task_times[task_id][end] time()def get_task_duration(self, task_id: str):times self.task_times[task_id]return times[end] - times[start]# 3. 实现优雅的终止
import signalclass GracefulWorkflow(DataAnalysisWorkflow):def __init__(self, *args, **kwargs):super().__init__(*args, **kwargs)self.should_stop Falsesignal.signal(signal.SIGINT, self.handle_interrupt)def handle_interrupt(self, signum, frame):print(\nReceived interrupt signal. Cleaning up...)self.should_stop True这个框架可以用于很多场景比如
数据处理管道ETL工作流机器学习实验报告生成系统自动化测试流程
关键是要根据具体需求调整任务定义和执行逻辑。