首頁  >  文章  >  科技週邊  >  PyTorch 並行訓練 DistributedDataParallel 完整程式碼範例

PyTorch 並行訓練 DistributedDataParallel 完整程式碼範例

WBOY
WBOY轉載
2023-04-10 20:51:011213瀏覽

使用大型資料集訓練大型深度神經網路 (DNN) 的問題是深度學習領域的主要挑戰。隨著 DNN 和資料集規模的增加,訓練這些模型的計算和記憶體需求也會增加。這使得在計算資源有限的單一機器上訓練這些模型變得困難甚至不可能。使用大型資料集訓練大型DNN 的一些主要挑戰包括:

  • 訓練時間長:訓練過程可能需要數週甚至數月才能完成,這取決於模型的複雜性和資料集的大小。
  • 記憶體限制:大型 DNN 可能需要大量記憶體來儲存訓練期間的所有模型參數、梯度和中間激活。這可能會導致記憶體不足錯誤並限制可在單一機器上訓練的模型的大小。

為了應對這些挑戰,已經開發了各種技術來擴大具有大型資料集的大型DNN 的訓練,包括模型並行性、資料並行性和混合並行性,以及硬體、軟體和演算法的最佳化。

在本文中我們將示範使用 PyTorch 的資料並行性和模型並行性。

PyTorch 並行訓練 DistributedDataParallel 完整程式碼範例

我們所說的平行性一般是指在多個gpu,或多台機器上訓練深度神經網路(dnn),以實現更少的訓練時間。資料並行背後的基本想法是將訓練資料分成更小的區塊,讓每個GPU或機器處理一個單獨的資料區塊。然後將每個節點的結果組合起來,用於更新模型參數。在資料並行中,模型體系結構在每個節點上是相同的,但模型參數在節點之間進行了分區。每個節點使用分配的資料塊訓練自己的本地模型,在每次訓練迭代結束時,模型參數在所有節點之間同步。這個過程不斷重複,直到模型收斂到一個令人滿意的結果。

下面我們用用ResNet50和CIFAR10資料集來進行完整的程式碼範例:

在資料並行中,模型架構在每個節點上保持相同,但模型參數在節點之間進行了分區,每個節點使用分配的資料塊訓練自己的本地模型。

PyTorch的DistributedDataParallel 函式庫可以進行跨節點的梯度和模型參數的高效能通訊和同步,實現分散式訓練。本文提供如何使用ResNet50和CIFAR10資料集使用PyTorch實現資料並行的範例,其中程式碼在多個gpu或機器上運行,每台機器處理訓練資料的一個子集。訓練過程使用PyTorch的DistributedDataParallel 函式庫進行並行化。

導入必須要的函式庫

import os
 from datetime import datetime
 from time import time
 import argparse
 import torchvision
 import torchvision.transforms as transforms
 import torch
 import torch.nn as nn
 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel

接下來,我們將檢查GPU。

import subprocess
 result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

因為我們需要在多個伺服器上運行,所以手動一個一個執行並不現實,所以需要有一個調度程式。這裡我們使用SLURM檔案來執行程式碼(slurm針對Linux和Unix類似核心的免費和開源工作調度程式),

def main():
 
 # get distributed configuration from Slurm environment
 
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type =int,
 help='batch size. it will be divided in mini-batch for each worker')
 parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-c','--checkpoint', default=None, type=str,
 help='path to checkpoint to load')
 args = parser.parse_args()
 
 rank = int(os.environ['SLURM_PROCID'])
 local_rank = int(os.environ['SLURM_LOCALID'])
 size = int(os.environ['SLURM_NTASKS'])
 master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
 port = "29500"
 node_id = os.environ['SLURM_NODEID']
 ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
 train(args, ddp_arg)

然後,我們使用DistributedDataParallel 函式庫來執行分散式訓練。

def train(args, ddp_arg):
 
 rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
 
 # display info
 if rank == 0:
 #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
 print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
 #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 
 # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
 #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
 dist.init_process_group(
 backend='nccl',
 init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
 world_size=size,
 rank=rank
)
 
 # distribute model
 torch.cuda.set_device(local_rank)
 gpu = torch.device("cuda")
 #model = ResNet18(classes=10).to(gpu)
 model = torchvision.models.resnet50(pretrained=False).to(gpu)
 ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
 if args.checkpoint is not None:
 map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
 ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
 
 # distribute batch size (mini-batch)
 batch_size = args.batch_size
 batch_size_per_gpu = batch_size // size
 
 # define loss function (criterion) and optimizer
 criterion = nn.CrossEntropyLoss()
 optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
 
 
 transform_train = transforms.Compose([
 transforms.RandomCrop(32, padding=4),
 transforms.RandomHorizontalFlip(),
 transforms.ToTensor(),
 transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
 
 # load data with distributed sampler
 #train_dataset = torchvision.datasets.CIFAR10(root='./data',
 # train=True,
 # transform=transform_train,
 # download=False)
 
 # load data with distributed sampler
 train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
 
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
 num_replicas=size,
 rank=rank)
 
 train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
 
 # training (timers and display handled by process 0)
 if rank == 0: start = datetime.now()
 total_step = len(train_loader)
 
 for epoch in range(args.epochs):
 if rank == 0: start_dataload = time()
 
 for i, (images, labels) in enumerate(train_loader):
 
 # distribution of images and labels to all GPUs
 images = images.to(gpu, non_blocking=True)
 labels = labels.to(gpu, non_blocking=True)
 
 if rank == 0: stop_dataload = time()
 
 if rank == 0: start_training = time()
 
 # forward pass
 outputs = ddp_model(images)
 loss = criterion(outputs, labels)
 
 # backward and optimize
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 
 if rank == 0: stop_training = time()
 if (i + 1) % 10 == 0 and rank == 0:
 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
 i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
 if rank == 0: start_dataload = time()
 
 #Save checkpoint at every end of epoch
 if rank == 0:
 torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
 if rank == 0:
 print(">>> Training complete in: " + str(datetime.now() - start))
 
 
 if __name__ == '__main__':
 
 main()

程式碼將資料和模型分割到多個gpu上,並以分散式的方式更新模型。以下是程式碼的一些解釋:

train(args, ddp_arg)有兩個參數,args和ddp_arg,其中args是傳遞給腳本的命令列參數,ddp_arg包含分散式訓練相關參數。

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg:解包ddp_arg中分散式訓練相關參數。

如果rank為0,則列印目前使用的gpu數量和主節點IP位址資訊。

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) :使用NCCL後端初始化分散式進程組。

torch.cuda.set_device(local_rank):為這個程序選擇指定的GPU。

model = torchvision.models. ResNet50 (pretrained=False).to(gpu):從torchvision模型載入ResNet50模型,並將其移至指定的gpu。

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]):將模型包裝在DistributedDataParallel模組中,也就是說這樣我們就可以進行分散式訓練了

載入CIFAR-10資料集並應用資料增強轉換。

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank):建立一個DistributedSampler對象,將資料集分割到多個gpu上。

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler,num_workers=0,pin_memory=True,sampler=train_sampler):建立一個DataLoader):建立一個DataLoader到模型中,這與我們平常訓練的步驟是一致的只不過是增加了一個分散式的資料採樣DistributedSampler。

為指定的epoch數訓練模型,以分散式的方式使用optimizer.step()更新權重。

rank0在每個輪結束時保存一個檢查點。

rank0每10个批次显示损失和训练时间。

结束训练时打印训练模型所花费的总时间也是在rank0上。

代码测试

在使用1个节点1/2/3/4个gpu, 2个节点6/8个gpu,每个节点3/4个gpu上进行了训练Cifar10上的Resnet50的测试如下图所示,每次测试的批处理大小保持不变。完成每项测试所花费的时间以秒为单位记录。随着使用的gpu数量的增加,完成测试所需的时间会减少。当使用8个gpu时,需要320秒才能完成,这是记录中最快的时间。这是肯定的,但是我们可以看到训练的速度并没有像GPU数量增长呈现线性的增长,这可能是因为Resnet50算是一个比较小的模型了,并不需要进行并行化训练。

PyTorch 並行訓練 DistributedDataParallel 完整程式碼範例

在多个gpu上使用数据并行可以显著减少在给定数据集上训练深度神经网络(DNN)所需的时间。随着gpu数量的增加,完成训练过程所需的时间减少,这表明DNN可以更有效地并行训练。

这种方法在处理大型数据集或复杂的DNN架构时特别有用。通过利用多个gpu,可以加快训练过程,实现更快的模型迭代和实验。但是需要注意的是,通过Data Parallelism实现的性能提升可能会受到通信开销和GPU内存限制等因素的限制,需要仔细调优才能获得最佳结果。

以上是PyTorch 並行訓練 DistributedDataParallel 完整程式碼範例的詳細內容。更多資訊請關注PHP中文網其他相關文章!

陳述:
本文轉載於:51cto.com。如有侵權,請聯絡admin@php.cn刪除