眉山网站优化,做企业官网还有必要吗,非洲用什么网站做采购,网站空间在哪里买0. Abstract
使用 PyTorch 进行多卡训练, 最简单的是 DataParallel, 仅仅添加一两行代码就可以使模型在多张 GPU 上并行地计算. 但它是比较老的方法, 官方推荐使用新的 Distributed Data Parallel, 更加灵活与强大:
1. Distributed Data Parallel (DDP)
从一个简单的非分布…0. Abstract
使用 PyTorch 进行多卡训练, 最简单的是 DataParallel, 仅仅添加一两行代码就可以使模型在多张 GPU 上并行地计算. 但它是比较老的方法, 官方推荐使用新的 Distributed Data Parallel, 更加灵活与强大:
1. Distributed Data Parallel (DDP)
从一个简单的非分布式训练任务, 到多机器多卡训练. 跟着官方教程走, 刚开始一切都很顺利, 到最后要多机器的时候, 就老是报错: MemoryError: std::bad_alloc.
1.1 DDP 概览 特点:
多个 batch 的数据, 同时分别在多个 GPU 上计算;需要 DistributedSampler 给各 GPU 分发数据 batch, 保证数据不重复;模型在各 GPU 上都有一份副本, 分别计算梯度, 并通过 ring all-reduce 算法整合梯度.
可以理解为: 为每个 GPU 启动一个进程, 这些进程执行着完全相同的代码(你的程序), 不同的地方在于:
吃进了不同的数据样本, 那么计算得到的 loss 和反向传播计算的参数梯度都不同;各进程有自己的编号(rank), 程序中可根据编号执行一些不同的操作, 如保存 checkpoint, 日志输出等操作.
1.2 single_gpu.py
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDatasetclass Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) - None:self.gpu_id gpu_idself.model model.to(gpu_id)self.train_data train_dataself.optimizer optimizerself.save_every save_everydef _run_batch(self, source, targets):output self.model(source)loss F.cross_entropy(output, targets)self.optimizer.zero_grad()loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz len(next(iter(self.train_data))[0])print(f[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)})for source, targets in self.train_data:source source.to(self.gpu_id)targets targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp self.model.state_dict()PATH checkpoint.pttorch.save(ckp, PATH)print(fEpoch {epoch} | Training checkpoint saved at {PATH})def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if epoch % self.save_every 0:self._save_checkpoint(epoch)def load_train_objs():train_set MyTrainDataset(2048) # load your datasetmodel torch.nn.Linear(20, 1) # load your modeloptimizer torch.optim.SGD(model.parameters(), lr1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_sizebatch_size,pin_memoryTrue,shuffleTrue)def main(device, total_epochs, save_every, batch_size):dataset, model, optimizer load_train_objs()train_data prepare_dataloader(dataset, batch_size)trainer Trainer(model, train_data, optimizer, device, save_every)trainer.train(total_epochs)if __name__ __main__:import argparseparser argparse.ArgumentParser(descriptionsimple distributed training job)parser.add_argument(total_epochs, typeint, helpTotal epochs to train the model)parser.add_argument(save_every, typeint, helpHow often to save a snapshot)parser.add_argument(--batch_size, default32, typeint, helpInput batch size on each device (default: 32))args parser.parse_args()device 0 # shorthand for cuda:0main(device, args.total_epochs, args.save_every, args.batch_size)1.3 multi_gpu.py
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datautils import MyTrainDatasetimport torch.multiprocessing as mp
from torch.utils.data.distributed import DistributedSampler
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.distributed import init_process_group, destroy_process_group
import osdef ddp_setup(rank, world_size):Args:rank: Unique identifier of each processworld_size: Total number of processes# MASTER 表示主节点, 负责分配任务, 启动其他进程os.environ[MASTER_ADDR] localhost # IP address of masteros.environ[MASTER_PORT] 12355 # Port number# This is important to prevent hangs or excessive memory utilization on GPU:0torch.cuda.set_device(rank) # sets the default GPU for each processinit_process_group(backendnccl, rankrank, world_sizeworld_size)class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,gpu_id: int,save_every: int,) - None:self.gpu_id gpu_idself.train_data train_dataself.optimizer optimizerself.save_every save_everyself.model DDP( # 感觉有点重复, 上面 ddp_setup 已经设置过默认 device 了model.to(gpu_id), # 这里要先将模型放到 gpu_id 号 GPU 上, 否则 DDP 会报错device_ids[gpu_id], # 那么这里再设置 device_ids 干嘛? 是可以分布到多个 GPU 上吗?)def _run_batch(self, source, targets):output self.model(source)loss F.cross_entropy(output, targets)self.optimizer.zero_grad()loss.backward()self.optimizer.step()def _run_epoch(self, epoch):b_sz len(next(iter(self.train_data))[0])# len(self.train_data)} 将会被分割为 num_device 份print(f[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)})# sampler.set_epoch(epoch) is necessary to make shuffling work properly across multiple epochs.# Otherwise, the same ordering will be used in each epoch.self.train_data.sampler.set_epoch(epoch) # 这里加了一句, 是为了保证每个 epoch 的数据是随机的for source, targets in self.train_data:source source.to(self.gpu_id)targets targets.to(self.gpu_id)self._run_batch(source, targets)def _save_checkpoint(self, epoch):ckp self.model.module.state_dict() # 因为 self.model 引用的是 DDP 对象, 所以想访问模型参数, 则需要 .modulePATH checkpoint.pttorch.save(ckp, PATH)print(fEpoch {epoch} | Training checkpoint saved at {PATH})def train(self, max_epochs: int):for epoch in range(max_epochs):self._run_epoch(epoch)if self.gpu_id 0 and epoch % self.save_every 0: # 主进程才保存self._save_checkpoint(epoch)def load_train_objs():train_set MyTrainDataset(2048) # load your datasetmodel torch.nn.Linear(20, 1) # load your modeloptimizer torch.optim.SGD(model.parameters(), lr1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_sizebatch_size,pin_memoryTrue,shuffleFalse, # 有了 DistributedSampler, 这里就不用 shuffle 了, 不过 default 已经是 FalsesamplerDistributedSampler(dataset))def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):rank: Unique identifier of each process, GPU ID, 也是进程的 ID, 0~world_size-1world_size: Total number of processes, 总共 GPU 数量ddp_setup(rank, world_size) # 先设置当前子进程dataset, model, optimizer load_train_objs() # 之后似乎都一样, 甚至数据,模型,优化器都是各进程都创建train_data prepare_dataloader(dataset, batch_size)trainer Trainer(model, train_data, optimizer, rank, save_every)trainer.train(total_epochs)destroy_process_group() # 结尾销毁进程组if __name__ __main__:import argparseparser argparse.ArgumentParser(descriptionsimple distributed training job)parser.add_argument(total_epochs, typeint, helpTotal epochs to train the model)parser.add_argument(save_every, typeint, helpHow often to save a snapshot)parser.add_argument(--batch_size, default32, typeint, helpInput batch size on each device (default: 32))args parser.parse_args()world_size torch.cuda.device_count()# spawn processes, 自动创建进程, 并且把 rank 作为第一个参数传入 mainmp.spawn(main, args(world_size, args.save_every, args.total_epochs, args.batch_size), nprocsworld_size)更改代码仅仅需要几个步骤:
构建进程组: init_process_group(...) destroy_process_group() main 函数被当作子进程启动, 每个子进程启动开头由 init_process_group(...) 构建进程组, 结尾由 destroy_process_group() 销毁进程组;DistributedDataParallel 包装模型 其实主要还是持有参数的模型, 至于计算部分, 不要紧, 每个子进程都在执行相同的计算过程(除非设置了 if rank... 的条件), 只会是参数梯度不同, 被包装后的模型参数会自动在进程组之间同步; 注意包装前先将模型移动到 GPU 上.DistributedSampler 均匀地将样本分给每个子进程 如果样本数不够整除, 则会将前几个样本补到末尾, 凑够整除, 注意是打乱后的前几个, 相当于随机补几个样本; 如果设置了 batch_size32, 那么每个进程都会得到 32 个样本, 实际的 batch_size32*num_gpus; 容易误解的地方在于, 实际 batch_size 增大了, 那么我求 loss 时用 mean 的话, 会不会降低梯度大小? 不会, 有些博主说要 learning_rate*num_gpus, 但实际上人家的 ring all-reduce 算法是把各进程上的梯度相加的, 相当于执行了多次梯度更新, 只不过是在相同的参数上, 而不是像单卡更新多次, 每次梯度计算在更新之后的不同的参数上. 每个子进程中访问的 DataLoader 中 batch 数会变为原来的 1/num_gpus, 而 len(dataset) 不会.每个 epoch 开始时, 调用 train_loader.sampler.set_epoch(epoch), 否则, 将在每个 epoch 中使用相同的顺序.设置 if rank0 为保存 checkpoint 的条件, 以保证只保存几个相同模型的其中一个. 聚合操作, 如你想整合各进程计算的不同结果并保存, 不应在 if rank 0 内, 聚合操作需要在每个进程中执行. 原因下面会解释.spawn 翻译过来就是下蛋, 意思是启动子进程, 可以看到, 相当于执行了多个 main 函数; rank 参数是自动传给 main 函数的. BatchNorm 是根据数据计算均值和标准差的, 所以每个 GPU 上计算的都不一样, 如果想合成一个完整的大 Batch, 需要 SyncBatchNorm 同步. 1.4 multigpu_torchrun.py
import osimport torch
import torch.nn.functional as F
from torch import distributed
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSamplerfrom datautils import MyTrainDatasetdef ddp_setup():都不用设置主机地址和端口号了, 直接一个 LOCAL_RANKtorch.cuda.set_device(int(os.environ[LOCAL_RANK]))distributed.init_process_group(backendnccl)class Trainer:def __init__(self,model: torch.nn.Module,train_data: DataLoader,optimizer: torch.optim.Optimizer,save_every: int,snapshot_path: str,) - None:self.gpu_id int(os.environ[LOCAL_RANK]) # 这里也是自动获取 LOCAL_RANKself.model model.to(self.gpu_id)self.train_data train_dataself.optimizer optimizerself.save_every save_everyself.epochs_run 0self.snapshot_path snapshot_pathif os.path.exists(snapshot_path):print(Loading snapshot)self._load_snapshot(snapshot_path)self.model DDP(self.model, device_ids[self.gpu_id])def _load_snapshot(self, snapshot_path):loc fcuda:{self.gpu_id}snapshot torch.load(snapshot_path, map_locationloc, weights_onlyTrue) # 每个 GPU 都要加载self.model.load_state_dict(snapshot[MODEL_STATE]) # 之所以是 model.load_state_dict, 是因为在 DDP 之前self.epochs_run snapshot[EPOCHS_RUN]print(fResuming training from snapshot at Epoch {self.epochs_run})def _run_batch(self, source, targets):print(source.shape[0])output self.model(source)loss F.mse_loss(output, targets)self.optimizer.zero_grad()loss.backward()self.optimizer.step()return lossdef _run_epoch(self, epoch):b_sz len(next(iter(self.train_data))[0])print(f[GPU{self.gpu_id}] fEpoch {epoch} | fBatchsize: {b_sz} | fSteps: {len(self.train_data)} | # data_loader 会 / num_devicesfdsize: {len(self.train_data.dataset)} # 而数据集大小还是原来的)self.train_data.sampler.set_epoch(epoch)loss_epoch 0for source, targets in self.train_data:source source.to(self.gpu_id)targets targets.to(self.gpu_id)loss self._run_batch(source, targets)loss_epoch lossprint(f[GPU{self.gpu_id}] Loss {loss_epoch.item()})distributed.all_reduce(loss_epoch, opdistributed.ReduceOp.AVG)print(f[GPU{self.gpu_id}] Loss {loss_epoch.item()})def _save_snapshot(self, epoch):snapshot {MODEL_STATE: self.model.module.state_dict(), # 之后就要用 module 了EPOCHS_RUN: epoch,}torch.save(snapshot, self.snapshot_path)print(fEpoch {epoch} | Training snapshot saved at {self.snapshot_path})def train(self, max_epochs: int):for epoch in range(self.epochs_run, max_epochs):self._run_epoch(epoch)if self.gpu_id 0 and epoch % self.save_every 0:self._save_snapshot(epoch)def load_train_objs():train_set MyTrainDataset(101) # load your datasetmodel torch.nn.Linear(20, 1) # load your modeloptimizer torch.optim.SGD(model.parameters(), lr1e-3)return train_set, model, optimizerdef prepare_dataloader(dataset: Dataset, batch_size: int):return DataLoader(dataset,batch_sizebatch_size,pin_memoryTrue,shuffleFalse,# 这个 DistributedSampler 会自动把数据集平均分给每个 GPU, 只是每个 DataLoader 得到的下标是 len(dataset) / num_devices 个# 原来的 len(dataloder.dataset) 还是 len(dataset)# 注意会补全, 最后每个 GPU 都会得到相同的数据, 而不是最后一个 GPU 会少得# 那补了之后, 样本数是比源数据集多一些, 测试呢, 也就有偏差, 当你有上万个测试样本时, 多出来的几个样本影响不大samplerDistributedSampler(dataset))def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str snapshot.pt):不带 rank 了, 直接用 LOCAL_RANKddp_setup()dataset, model, optimizer load_train_objs()train_data prepare_dataloader(dataset, batch_size)trainer Trainer(model, train_data, optimizer, save_every, snapshot_path)trainer.train(total_epochs)distributed.destroy_process_group()if __name__ __main__:import argparseparser argparse.ArgumentParser(descriptionsimple distributed training job)parser.add_argument(total_epochs, typeint, helpTotal epochs to train the model)parser.add_argument(save_every, typeint, helpHow often to save a snapshot)parser.add_argument(--batch_size, default32, typeint, helpInput batch size on each device (default: 32))args parser.parse_args()main(args.save_every, args.total_epochs, args.batch_size) # 不管 rank 和 device执行命令:
torchrun --standalone --nproc_per_node2 multigpu_torchrun.py 50 10
# 如果设置 --nproc_per_nodegpu, 则自动检测可用 gpu 数量, 并为每个 gpu 启动一个进程.这里使用了不同的启动方式 torchrun, 本质还是一样的, 特点:
能自动重启 当训练出现意外而中断时, torchrun 会自动重启, 如果保存了 checkpoint 并设置了自动加载程序, 那么就可以接着训.设置了环境变量 “LOCAL_RANK” 你可以在程序中使用 os.environ[LOCAL_RANK]) 访问当前进程的 rank 号了. 不过我感觉仅仅是在 distributed.init_process_group(backendnccl) 之前使用, 后来的地方你可以接续这么干, 但构建进程组后有一个函数 distributed.get_node_local_rank() 可以获取进程号.单卡也可以跑, 设置 --nproc_per_node1.
1.5 同步操作
模型参数可以通过 DDP 自动地同步, 那如果我想聚合所有子进程上计算的 loss 呢? 或者我在测试时, 想聚合测试结果? 官网的这个小教程没教. 查阅博客才得知需要用 distributed.all_reduce(...).
上面的 multigpu_torchrun.py 中, 我已经对 loss 添加了这个同步:
print(f[GPU{self.gpu_id}] Loss {loss_epoch.item()})
distributed.all_reduce(loss_epoch, opdistributed.ReduceOp.AVG)
print(f[GPU{self.gpu_id}] Loss {loss_epoch.item()})
########## output ##########
[GPU1] Loss 0.3411558270454407
[GPU0] Loss 0.29943281412124634
[GPU1] Loss 0.3202943205833435
[GPU0] Loss 0.3202943205833435两个 GPU 计算的 loss 分别为 0.3411558270454407 和 0.29943281412124634, 经过同步, 都变为了 0.3202943205833435.
注意:
只可对 torch.Tensor 执行同步, 其他类型的如 Python int 和 np.ndarray 都不行.可以选择其他聚合操作, 如 opdistributed.ReduceOp.SUM 表示相加: 具体可见: Collective Functions.聚合操作不应在 if rank 0 内; 聚合操作需要在每个进程中执行.
1.6 会出现模型加载错误
如果刚用 torch.save(...) 保存了模型, 立刻就使用 torch.load(...) 加载, 那么很可能会出现错误:
[rank1]: RuntimeError: PytorchStreamReader failed reading zip archive: failed finding central directory原因不明.
解决办法:
time.sleep(1)
torch.load(...)等 1s 再加载就不出错了.
2. 总结
看起来比较复杂, 但如果构建对 Distributed Data Parallel 的认知框架, 一切都变得简单:
DDP 为每个 GPU 启动一个子进程, 它们执行完全相同的代码;distributed.init_process_group(backendnccl) 构建进程组, 程序结束时 distributed.destroy_process_group() 销毁进程组;DistributedSampler(dataset) 为每个子进程分发不重叠的等分的 Dataset 子集, 实现数据并行;用 DDP 对象包装模型, 就能在进程组中同步梯度和参数; 叫 ring all-reduce 算法;你可以用 all_reduce 等操作实现进程间的张量同步;还可以根据进程的 rank 号对不同子进程执行略有不同的操作, 如保存模型操作.