重庆九龙坡营销型网站建设价格,上海网站设计公司,wordpress 视频截图,安吉做企业网站目录
一、Dask介绍
二、使用说明
安装
三、测试
1、单个文件中实现功能
2、运行多个可执行文件 最近在写并行计算相关部分#xff0c;用到了python的Dask库。
Dask官网#xff1a;Dask | Scale the Python tools you love
一、Dask介绍
Dask是一个灵活的并行和分布式…目录
一、Dask介绍
二、使用说明
安装
三、测试
1、单个文件中实现功能
2、运行多个可执行文件 最近在写并行计算相关部分用到了python的Dask库。
Dask官网Dask | Scale the Python tools you love
一、Dask介绍
Dask是一个灵活的并行和分布式计算库旨在处理大规模数据集。它提供了类似于Pandas 和 NumPy 的数据结构但能够有效处理比内存更大的数据集。通过使用Dask可以在单台机器或分布式集群中运行更方便处理大规模数据。
Dask是一个用于Python的并行计算模块从单机多核扩展到拥有数千台机器的数据中心。它既由低级任务API也有更高级面向数据的API。低级任务API支持Dask与多种Python库的集成公共API为围绕Dask发展的各种工具的生态系统提供了基础。
Dask相较于Spark这些大数据处理框架更轻量级。Dask更侧重与其他框架如Numpy、PandasScikit-learning相结合从而使其能更加方便进行分布式并行计算。
Dask存在三种最基本的数据结构分别是Arrays、Dataframes以及Bags。
二、使用说明
安装
pip install dask
python -m pip install dask[array]
python -m pip install dask[distributed]
python -m pip install dask[dataframe]
先测试是否已经安装了模块命令行进入到python3编辑器
from dask.distributed import Client, progress 没有报缺少模块错误则说明是可以正常执行的。
三、测试
1、单个文件中实现功能
下述的主要数据处理在定义计算任务函数calculate_value(num)中即将计算任务函数处理32次。
from dask.distributed import Client, progress
import time# 定义计算任务的函数
def calculate_value(num):num_float float(num) * 0.33num_double float(num) * 0.33 return num_float, num_double# 设置Dask客户端
def setup_client():from dask.distributed import Client, LocalClustercluster LocalCluster()client Client(cluster)scheduler_info client.scheduler_info()ncores sum(worker[nthreads] for worker in scheduler_info[workers].values())print(fConnected to Dask cluster with {ncores} cores)return client# 提交任务并收集结果
def submit_tasks(client, num, num_tasks32):# 创建任务列表tasks [client.submit(calculate_value, num) for _ in range(num_tasks)]# 等待所有任务完成并显示进度progress(tasks)# 收集结果results [task.result() for task in tasks]return results# 主函数
def main():num 558558571 # 这是您要处理的数字client setup_client() # 设置Dask客户端# 提交32个任务results submit_tasks(client, num)# 打印结果for i, (num_float, num_double) in enumerate(results):print(fTask {i1} - num_float: {num_float}, num_double: {num_double})# 关闭客户端连接client.close()if __name__ __main__:main()
运行上述的python程序
python3 my_dask_script.py
执行结果如下 此时表示运行了32个task。
在运行的时候如果提示 表明 dask-scheduler 无法启动原因是端口 8787 已经被占用了。
解决方法
1、查找并终止占用端口 8787 的进程
1先安装lsof
apt install lsof
2查看占用端口进程
lsof -i :87873通过进程的 PID 使用 kill 命令终止该进程
kill -9 PID2、修改 dask-scheduler 使用的端口
dask-scheduler --port 8888再次重新启动查看 dask-scheduler 使用的端口
dask-scheduler2、运行多个可执行文件
我在同目录中创建了一个test.cc文件为简单的打印数据内容如下
#include iostream
#include iomanipint main() {int num 558558571;float num_float static_castfloat(num) * 0.33;double num_double static_castdouble(num) * 0.33;std::cout num value: num std::endl;std::cout std::fixed std::setprecision(2);std::cout num_float value: num_float std::endl;std::cout num_double value: num_double std::endl;return 0;
}
此时将上述的test.cc编译
g -o main test.cc然后新建一个my_dask_script.py文件内容如下
from dask.distributed import Client, LocalCluster
import os# 定义执行外部程序的函数
def run_external_program():cmd ./main # 您的外部程序命令os.system(cmd) # 使用os.system来执行命令# 设置Dask客户端
def setup_client():from dask.distributed import Client, LocalClustercluster LocalCluster()client Client(cluster)scheduler_info client.scheduler_info()ncores sum(worker[nthreads] for worker in scheduler_info[workers].values())print(fConnected to Dask cluster with {ncores} cores)return client# 提交任务到Dask集群
def submit_tasks(client, num_tasks32):futures [client.submit(run_external_program) for _ in range(num_tasks)]return futures# 主函数
def main():client setup_client() # 设置Dask客户端futures submit_tasks(client) # 提交任务# 等待所有任务完成client.gather(futures)# 关闭客户端连接client.close()if __name__ __main__:main()
运行结果 此时表示上述的可执行文件main已运行了32份。