企业网站建设公司,视频网站建设应该注意什么,楼市最新消息价格,网站开发的源码近年来#xff0c;深度学习模型的规模越来越大#xff0c;需要处理的数据也越来越多#xff0c;单卡训练的显存空间和计算效率都越来越难以满足需求。因此#xff0c;多卡并行训练成为了一个必要的解决方案本文主要介绍使用 Pytorch 的 DistributedDataParallel#xff08…近年来深度学习模型的规模越来越大需要处理的数据也越来越多单卡训练的显存空间和计算效率都越来越难以满足需求。因此多卡并行训练成为了一个必要的解决方案本文主要介绍使用 Pytorch 的 DistributedDataParallelDDP库进行分布式数据并行训练的方法 文章目录 1. 多卡并行简介1.1 两种并行形式1.2 Pytorch 中的多卡并行 2. 使用 DDP 进行单机多卡训练2.1 原理概述2.2 使用 DDP 改写单卡训练代码 1. 多卡并行简介
多卡并行训练主要用于解决以下几个问题 相同 batch size 下加速训练多卡并行可以将数据分为多份同时在不同的GPU上运行从而大大加快训练速度相同速度下使用更大的 batch size多卡并行可以在多个GPU之间共享显存允许我们设置更大的 batch size增加可训练的模型规模有些模型参数多到单卡训练无法承受而多卡并行可以将模型放入多个GPU中从而扩充可训练模型的规模
1.1 两种并行形式 多卡并行训练有数据并行和模型并行两种形式 数据并行每个GPU都保存一个模型副本训练数据划分成多份交给各个GPU计算梯度然后汇总梯度更新模型参数。根据梯度汇总的方式数据并行又可以分成 Parameter Server 和 Ring All-Reduce 两种前者使用一个 master GPU 汇总梯度更新参数再将参数分发给各个模型后者以环的形式互相传递梯度每个GPU都维护一个优化器各自汇总梯度并自行更新模型参数。Ring All-Reduce 方案能更高效地利用所有卡的上下行带宽是目前的主流方案 模型并行将模型切分成多个部分放在不同的GPU上并行运行每个GPU负责处理一部分模型参数并将处理后的结果发送到其它GPU进行合并从而实现整体模型的更新。这种操作目前并不常见一是因为大部分模型单卡都放得下二是因为通讯开销比数据并行多。根据模型切分方式模型并行也可以分成 Pipelined Parallelism 和 Tensor Slicing 两种前者将模型的各个层放到不同的 GPU 上运行这种做法比较通用但是效率不高后者针对模型中各种模块attention、FFN 等的张量计算操作进行拆解把 tensor 计算分块分散到不同的机器上进行并行效率较高但是通用性差 关于各种并行方法的详细说明可以参考分布式训练、混合精度训练、梯度累加…一文带你优雅地训练大型模型
1.2 Pytorch 中的多卡并行
随着各种深度学习框架的日趋完善很多并行方法已经被整合其中这让实现多卡并行加速训练变得相对简单。Pytorch 中提供了 DPDataParallel 和 DDPDistributedDataParallel 两种数据并行方法它们的性能对比如下 红色柱子是 DP绿色柱子是 DDP蓝色柱子是 DDP Apex 混合精度训练。注意到 DDP 的表现大幅优于 DP这是因为 DP 使用 Parameter Server 方式汇聚梯度并更新参数主卡计算负载和通信带宽需求相比其他卡都显著高导致主卡的计算能力和上下行带宽成为性能瓶颈DDP 使用更高效的 Ring All-Reduce 方案基本实现了 “使用几块GPU就是几倍加速” 的效果 接下来本文会介绍使用 DDP 进行多卡加速的具体做法参考自Pytorch 官方教程
2. 使用 DDP 进行单机多卡训练
2.1 原理概述
DDP 会在每个 GPU 上运行一个进程每个进程中都有一套完全相同的 Trainer 副本包括 model 和 optimizer各个进程之间通过一个进程池进行通信。这里有几个术语 node多机多卡运行时每个机器称为一个 “node”其中每一张卡都可以运行一个并行进程world size所有并行进程的总数各个 node 上并行的GPU总数rank所有 node 的所有进程中各个进程的标识符号是从0开始计数的整数local rank当前 node 的所有进程中各个进程的标识符号是从0开始计数的整数group所有并行的进程组成一个 group进程池只有组内的进程间才可以相互通信
2.2 使用 DDP 改写单卡训练代码 考虑如何将以下单机单卡代码改为 DDP 单机多卡运行 # 单 GPU 训练示例
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoaderclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int, ) - None:self.gpu_id gpu_idself.model model.to(gpu_id)self.train_data train_dataself.optimizer optimizerself.save_every save_everydef _run_batch(self, source, targets):self.optimizer.zero_grad()output self.model(source)loss F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz len(next(iter(self.train_data))[0])print(f[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)})for source, targets in self.train_data:source source.to(self.gpu_id)targets targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp self.model.state_dict()PATH checkpoint.pttorch.save(ckp, PATH)print(fEpoch {epoch} | Training checkpoint saved at {PATH})def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if epoch % self.save_every 0:self._save_checkpoint(epoch)class MyTrainDataset(Dataset):def __init__(self, size):self.size sizeself.data [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]def load_train_objs():train_set MyTrainDataset(2048) # load your datasetmodel torch.nn.Linear(20, 1) # load your modeloptimizer torch.optim.SGD(model.parameters(), lr1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_sizebatch_size,pin_memoryTrue,shuffleTrue)def main(device, total_epochs, save_every, batch_size):dataset, model, optimizer load_train_objs()train_data prepare_dataloader(dataset, batch_size)trainer Trainer(model, train_data, optimizer, device, save_every)trainer.train(total_epochs)if __name__ __main__:import argparseparser argparse.ArgumentParser(descriptionsimple distributed training job)parser.add_argument(--total-epochs, typeint, default50, helpTotal epochs to train the model)parser.add_argument(--save-every, typeint, default10, helpHow often to save a snapshot)parser.add_argument(--batch_size, default32, typeint, helpInput batch size on each device (default: 32))args parser.parse_args()device 0 # shorthand for cuda:0main(device, args.total_epochs, args.save_every, args.batch_size)将单卡训练代码改写为 DDP 并行的要点如下 引入 DDP 相关库# 使用 DistributedDataParallel 进行单机多卡训练
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import os# 对 python 多进程的一个 pytorch 包装用于后续分发进程
import torch.multiprocessing as mp
# 这个 sampler 可以把采样的数据分散到各个 CPU 上
from torch.utils.data.distributed import DistributedSampler
# 实现分布式数据并行的核心类
from torch.nn.parallel import DistributedDataParallel as DDP
# 各个进程之间通过一个进程池进行通信这两个方法来初始化和销毁进程池
from torch.distributed import init_process_group, destroy_process_group 在程序入口初始化进程池在程序出口销毁进程池def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池ddp_setup(rank, world_size)# 进行训练dataset, model, optimizer load_train_objs()train_data prepare_dataloader(dataset, batch_size)trainer Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()使用 DistributedDataParallel 包装模型这样模型才能在各个进程间同步参数self.model DDP(model, device_ids[gpu_id]) # model 要用 DDP 包装一下包装后 model 变成了一个 DDP 对象要访问其参数得这样写 self.model.module.state_dict()构造 Dataloader 时使用 DistributedSampler 作为 sampler这个采样器可以自动将数量为 batch_size 的数据分发到各个GPU上并保证数据不重叠def prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_sizebatch_size,pin_memoryTrue,shuffleFalse, # 设置了新的 sampler参数 shuffle 要设置为 False samplerDistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU它能避免数据重叠)注意需要在各 epoch 入口调用该 sampler 对象的 set_epoch() 方法否则每个 epoch 加载的样本顺序都不变运行过程中单独控制某个进程进行某些操作比如要想保存 ckpt由于每张卡里都有完整的模型参数所以只需要控制一个进程保存即可。需要注意的是使用 DDP 改写的代码会在每个 GPU 上各自运行因此需要在程序中获取当前 GPU 的 rankgpu_id这样才能对针对性地控制各个 GPU 的行为if self.gpu_id 0 and epoch % self.save_every 0:self._save_checkpoint(epoch)使用 torch.multiprocessing.spawn 方法将代码分发到各个 GPU 的进程中执行# 利用 mp.spawn在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法并能设置要传入 fn 的参数 args
# 注意不需要传入 fn 的 rank 参数它由 mp.spawn 自动分配
world_size torch.cuda.device_count()
mp.spawn(fnmain, args(world_size, args.save_every, args.total_epochs, args.batch_size), nprocsworld_size
)完整的修改版代码如下请参考注释自行对比 # 使用 DistributedDataParallel 进行单机多卡训练
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import os# 对 python 多进程的一个 pytorch 包装
import torch.multiprocessing as mp# 这个 sampler 可以把采样的数据分散到各个 CPU 上
from torch.utils.data.distributed import DistributedSampler # 实现分布式数据并行的核心类
from torch.nn.parallel import DistributedDataParallel as DDP # DDP 在每个 GPU 上运行一个进程其中都有一套完全相同的 Trainer 副本包括model和optimizer
# 各个进程之间通过一个进程池进行通信这两个方法来初始化和销毁进程池
from torch.distributed import init_process_group, destroy_process_group def ddp_setup(rank, world_size):setup the distribution process groupArgs:rank: Unique identifier of each processworld_size: Total number of processes# MASTER Node运行 rank0 进程多机多卡时的主机用来协调各个 Node 的所有进程之间的通信os.environ[MASTER_ADDR] localhost # 由于这里是单机实验所以直接写 localhostos.environ[MASTER_PORT] 12355 # 任意空闲端口init_process_group(backendnccl, # Nvidia CUDA CPU 用这个 ncclrankrank, world_sizeworld_size)torch.cuda.set_device(rank)class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) - None:self.gpu_id gpu_idself.model model.to(gpu_id)self.train_data train_dataself.optimizer optimizerself.save_every save_every # 指定保存 ckpt 的周期self.model DDP(model, device_ids[gpu_id]) # model 要用 DDP 包装一下def _run_batch(self, source, targets):self.optimizer.zero_grad()output self.model(source)loss F.cross_entropy(output, targets)loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz len(next(iter(self.train_data))[0])print(f[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)})self.train_data.sampler.set_epoch(epoch) # 在各个 epoch 入口调用 DistributedSampler 的 set_epoch 方法是很重要的这样才能打乱每个 epoch 的样本顺序for source, targets in self.train_data: source source.to(self.gpu_id)targets targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp self.model.module.state_dict() # 由于多了一层 DDP 包装通过 .module 获取原始参数 PATH checkpoint.pttorch.save(ckp, PATH)print(fEpoch {epoch} | Training checkpoint saved at {PATH})def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)# 各个 GPU 上都在跑一样的训练进程这里指定 rank0 进程保存 ckpt 以免重复保存if self.gpu_id 0 and epoch % self.save_every 0:self._save_checkpoint(epoch)class MyTrainDataset(Dataset):def __init__(self, size):self.size sizeself.data [(torch.rand(20), torch.rand(1)) for _ in range(size)]def __len__(self):return self.sizedef __getitem__(self, index):return self.data[index]def load_train_objs():train_set MyTrainDataset(2048) # load your datasetmodel torch.nn.Linear(20, 1) # load your modeloptimizer torch.optim.SGD(model.parameters(), lr1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_sizebatch_size,pin_memoryTrue,shuffleFalse, # 设置了新的 sampler参数 shuffle 要设置为 False samplerDistributedSampler(dataset) # 这个 sampler 自动将数据分块后送个各个 GPU它能避免数据重叠)def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):# 初始化进程池ddp_setup(rank, world_size)# 进行训练dataset, model, optimizer load_train_objs()train_data prepare_dataloader(dataset, batch_size)trainer Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)# 销毁进程池destroy_process_group()if __name__ __main__:import argparseparser argparse.ArgumentParser(descriptionsimple distributed training job)parser.add_argument(--total-epochs, typeint, default50, helpTotal epochs to train the model)parser.add_argument(--save-every, typeint, default10, helpHow often to save a snapshot)parser.add_argument(--batch_size, default32, typeint, helpInput batch size on each device (default: 32))args parser.parse_args()world_size torch.cuda.device_count()# 利用 mp.spawn在整个 distribution group 的 nprocs 个 GPU 上生成进程来执行 fn 方法并能设置要传入 fn 的参数 args# 注意不需要 fn 的 rank 参数它由 mp.spawn 自动分配mp.spawn(fnmain, args(world_size, args.save_every, args.total_epochs, args.batch_size), nprocsworld_size)