使用大型数据集训练大型深度神经网络 (DNN) 的问题是深度学习领域的主要挑战。 随着 DNN 和数据集规模的增加,训练这些模型的计算和内存需求也会增加。 这使得在计算资源有限的单台机器上训练这些模型变得困难甚至不可能。 使用大型数据集训练大型 DNN 的一些主要挑战包括:
为了应对这些挑战,已经开发了各种技术来扩大具有大型数据集的大型 DNN 的训练,包括模型并行性、数据并行性和混合并行性,以及硬件、软件和算法的优化。
在本文中我们将演示使用 PyTorch 的数据并行性和模型并行性。
我们所说的并行性一般是指在多个gpu,或多台机器上训练深度神经网络(dnn),以实现更少的训练时间。数据并行背后的基本思想是将训练数据分成更小的块,让每个GPU或机器处理一个单独的数据块。然后将每个节点的结果组合起来,用于更新模型参数。在数据并行中,模型体系结构在每个节点上是相同的,但模型参数在节点之间进行了分区。每个节点使用分配的数据块训练自己的本地模型,在每次训练迭代结束时,模型参数在所有节点之间同步。这个过程不断重复,直到模型收敛到一个令人满意的结果。
下面我们用用Re.NET50和CIFAR10数据集来进行完整的代码示例:
在数据并行中,模型架构在每个节点上保持相同,但模型参数在节点之间进行了分区,每个节点使用分配的数据块训练自己的本地模型。
PyTorch的DistributedDataParallel 库可以进行跨节点的梯度和模型参数的高效通信和同步,实现分布式训练。本文提供了如何使用ResNet50和CIFAR10数据集使用PyTorch实现数据并行的示例,其中代码在多个gpu或机器上运行,每台机器处理训练数据的一个子集。训练过程使用PyTorch的DistributedDataParallel 库进行并行化。
import os
from datetime import datetime
from time import time
import argparse
import torchvision
import torchvision.transforms as transforms
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel
接下来,我们将检查GPU。
import subprocess
result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
print(result.stdout.decode())
因为我们需要在多个服务器上运行,所以手动一个一个执行并不现实,所以需要有一个调度程序。这里我们使用SLURM文件来运行代码(slurm面向linux和Unix类似内核的免费和开源工作调度程序),
def main():
# get distributed configuration from Slurm environment
parser = argparse.ArgumentParser()
parser.add_argument('-b', '--batch-size', default=128, type =int,
help='batch size. it will be divided in mini-batch for each worker')
parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
help='number of total epochs to run')
parser.add_argument('-c','--checkpoint', default=None, type=str,
help='path to checkpoint to load')
args = parser.parse_args()
rank = int(os.environ['SLURM_PROCID'])
local_rank = int(os.environ['SLURM_LOCALID'])
size = int(os.environ['SLURM_NTASKS'])
master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
port = "29500"
node_id = os.environ['SLURM_NODEID']
ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
train(args, ddp_arg)
然后,我们使用DistributedDataParallel 库来执行分布式训练。
def train(args, ddp_arg):
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
# display info
if rank == 0:
#print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
#print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
# configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
#dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
dist.init_process_group(
backend='nccl',
init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
world_size=size,
rank=rank
)
# distribute model
torch.cuda.set_device(local_rank)
gpu = torch.device("cuda")
#model = ResNet18(classes=10).to(gpu)
model = torchvision.models.resnet50(pretrained=False).to(gpu)
ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
if args.checkpoint is not None:
map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
# distribute batch size (mini-batch)
batch_size = args.batch_size
batch_size_per_gpu = batch_size // size
# define loss function (criterion) and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
transform_train = transforms.Compose([
transforms.RandomCrop(32, padding=4),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
# load data with distributed sampler
#train_dataset = torchvision.datasets.CIFAR10(root='./data',
# train=True,
# transform=transform_train,
# download=False)
# load data with distributed sampler
train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
num_replicas=size,
rank=rank)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
# training (timers and display handled by process 0)
if rank == 0: start = datetime.now()
total_step = len(train_loader)
for epoch in range(args.epochs):
if rank == 0: start_dataload = time()
for i, (images, labels) in enumerate(train_loader):
# distribution of images and labels to all GPUs
images = images.to(gpu, non_blocking=True)
labels = labels.to(gpu, non_blocking=True)
if rank == 0: stop_dataload = time()
if rank == 0: start_training = time()
# forward pass
outputs = ddp_model(images)
loss = criterion(outputs, labels)
# backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if rank == 0: stop_training = time()
if (i + 1) % 10 == 0 and rank == 0:
print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
if rank == 0: start_dataload = time()
#Save checkpoint at every end of epoch
if rank == 0:
torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
if rank == 0:
print(">>> Training complete in: " + str(datetime.now() - start))
if __name__ == '__main__':
main()
代码将数据和模型分割到多个gpu上,并以分布式的方式更新模型。下面是代码的一些解释:
train(args, ddp_arg)有两个参数,args和ddp_arg,其中args是传递给脚本的命令行参数,ddp_arg包含分布式训练相关参数。
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分布式训练相关参数。
如果rank为0,则打印当前使用的gpu数量和主节点IP地址信息。
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL后端初始化分布式进程组。
torch.cuda.set_device(local_rank):为这个进程选择指定的GPU。
model = torchvision.models. ResNet50 (pretrained=False).to(gpu):从torchvision模型中加载ResNet50模型,并将其移动到指定的gpu。
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):将模型包装在DistributedDataParallel模块中,也就是说这样我们就可以进行分布式训练了
加载CIFAR-10数据集并应用数据增强转换。
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):创建一个DistributedSampler对象,将数据集分割到多个gpu上。
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler):创建一个DataLoader对象,数据将批量加载到模型中,这与我们平常训练的步骤是一致的只不过是增加了一个分布式的数据采样DistributedSampler。
为指定的epoch数训练模型,以分布式的方式使用optimizer.step()更新权重。
rank0在每个轮次结束时保存一个检查点。
rank0每10个批次显示损失和训练时间。
结束训练时打印训练模型所花费的总时间也是在rank0上。
在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。
在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。
这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。