Home >Technology peripherals >AI >PyTorch parallel training DistributedDataParallel complete code example

PyTorch parallel training DistributedDataParallel complete code example

WBOY
WBOYforward
2023-04-10 20:51:011338browse

The problem of training large deep neural networks (DNN) using large datasets is a major challenge in the field of deep learning. As DNN and dataset sizes increase, so do the computational and memory requirements for training these models. This makes it difficult or even impossible to train these models on a single machine with limited computing resources. Some of the major challenges of training large DNNs using large datasets include:

  • Long training time: The training process can take weeks or even months to complete, depending on the complexity of the model and the size of the dataset .
  • Memory limitations: Large DNNs may require large amounts of memory to store all model parameters, gradients, and intermediate activations during training. This can cause out-of-memory errors and limit the size of the model that can be trained on a single machine.

To address these challenges, various techniques have been developed to scale up the training of large DNNs with large datasets, including model parallelism, data parallelism, and hybrid parallelism, as well as hardware, software, and Algorithm optimization.

In this article we will demonstrate data parallelism and model parallelism using PyTorch.

PyTorch parallel training DistributedDataParallel complete code example

What we call parallelism generally refers to training deep neural networks (dnn) on multiple GPUs or multiple machines to achieve Less training time. The basic idea behind data parallelism is to split the training data into smaller chunks and let each GPU or machine process a separate chunk of data. The results for each node are then combined and used to update model parameters. In data parallelism, the model architecture is the same on each node, but the model parameters are partitioned between nodes. Each node trains its own local model using allocated chunks of data, and at the end of each training iteration, the model parameters are synchronized across all nodes. This process is repeated until the model converges to a satisfactory result.

Below we use the ResNet50 and CIFAR10 data sets for a complete code example:

In data parallelism, the model architecture remains the same on each node, but the model parameters are between nodes. Partitioning is done, and each node trains its own local model using the allocated data chunks.

PyTorch's DistributedDataParallel library can efficiently communicate and synchronize gradients and model parameters across nodes to achieve distributed training. This article provides an example of how to implement data parallelism with PyTorch using the ResNet50 and CIFAR10 datasets, where the code is run on multiple GPUs or machines, with each machine processing a subset of the training data. The training process is parallelized using PyTorch's DistributedDataParallel library.

Import the necessary libraries

import os
 from datetime import datetime
 from time import time
 import argparse
 import torchvision
 import torchvision.transforms as transforms
 import torch
 import torch.nn as nn
 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel

Next, we will check the GPU.

import subprocess
 result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

Because we need to run on multiple servers, it is not practical to execute them one by one manually, so a scheduler is needed. Here we use a SLURM file to run the code (slurmFree and open source job scheduler for Linux and Unix-like kernels),

def main():
 
 # get distributed configuration from Slurm environment
 
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type =int,
 help='batch size. it will be divided in mini-batch for each worker')
 parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-c','--checkpoint', default=None, type=str,
 help='path to checkpoint to load')
 args = parser.parse_args()
 
 rank = int(os.environ['SLURM_PROCID'])
 local_rank = int(os.environ['SLURM_LOCALID'])
 size = int(os.environ['SLURM_NTASKS'])
 master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
 port = "29500"
 node_id = os.environ['SLURM_NODEID']
 ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
 train(args, ddp_arg)

Then, we use the DistributedDataParallel library to perform distributed training.

def train(args, ddp_arg):
 
 rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
 
 # display info
 if rank == 0:
 #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
 print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
 #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 
 # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
 #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
 dist.init_process_group(
 backend='nccl',
 init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
 world_size=size,
 rank=rank
)
 
 # distribute model
 torch.cuda.set_device(local_rank)
 gpu = torch.device("cuda")
 #model = ResNet18(classes=10).to(gpu)
 model = torchvision.models.resnet50(pretrained=False).to(gpu)
 ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
 if args.checkpoint is not None:
 map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
 ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
 
 # distribute batch size (mini-batch)
 batch_size = args.batch_size
 batch_size_per_gpu = batch_size // size
 
 # define loss function (criterion) and optimizer
 criterion = nn.CrossEntropyLoss()
 optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
 
 
 transform_train = transforms.Compose([
 transforms.RandomCrop(32, padding=4),
 transforms.RandomHorizontalFlip(),
 transforms.ToTensor(),
 transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
 
 # load data with distributed sampler
 #train_dataset = torchvision.datasets.CIFAR10(root='./data',
 # train=True,
 # transform=transform_train,
 # download=False)
 
 # load data with distributed sampler
 train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
 
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
 num_replicas=size,
 rank=rank)
 
 train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
 
 # training (timers and display handled by process 0)
 if rank == 0: start = datetime.now()
 total_step = len(train_loader)
 
 for epoch in range(args.epochs):
 if rank == 0: start_dataload = time()
 
 for i, (images, labels) in enumerate(train_loader):
 
 # distribution of images and labels to all GPUs
 images = images.to(gpu, non_blocking=True)
 labels = labels.to(gpu, non_blocking=True)
 
 if rank == 0: stop_dataload = time()
 
 if rank == 0: start_training = time()
 
 # forward pass
 outputs = ddp_model(images)
 loss = criterion(outputs, labels)
 
 # backward and optimize
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 
 if rank == 0: stop_training = time()
 if (i + 1) % 10 == 0 and rank == 0:
 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
 i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
 if rank == 0: start_dataload = time()
 
 #Save checkpoint at every end of epoch
 if rank == 0:
 torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
 if rank == 0:
 print(">>> Training complete in: " + str(datetime.now() - start))
 
 
 if __name__ == '__main__':
 
 main()

The code splits the data and model across multiple GPUs and updates the model in a distributed manner. Here are some explanations of the code:

train(args, ddp_arg) has two parameters, args and ddp_arg, where args is the command line parameter passed to the script, and ddp_arg contains distributed training related parameters.

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg: Unpack the distributed training related parameters in ddp_arg.

If rank is 0, print the number of GPUs currently used and the master node IP address information.

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank): Use NCCL backend Initialize the distributed process group.

torch.cuda.set_device(local_rank): Select the specified GPU for this process.

model = torchvision.models. ResNet50 (pretrained=False).to(gpu): Load the ResNet50 model from the torchvision model and move it to the specified gpu.

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]): Wrap the model in the DistributedDataParallel module, which means that we can perform distributed training

Load CIFAR-10 data Collect and apply data augmentation transformations.

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank): Create a DistributedSampler object to split the data set onto multiple GPUs.

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler): Create a DataLoader object and the data will be loaded in batches In the model, this is consistent with our usual training steps, except that a distributed data sampling DistributedSampler is added.

Train the model for the specified number of epochs, and use optimizer.step() to update the weights in a distributed manner.

rank0 saves a checkpoint at the end of each round.

rank0 shows loss and training time every 10 batches.

At the end of training, the total time spent on printing the training model is also in rank0.

Code test

Training was conducted using 1 node with 1/2/3/4 GPUs, 2 nodes with 6/8 GPUs, and each node with 3/4 GPUs The test of Resnet50 on Cifar10 is shown in the figure below. The batch size of each test remains the same. The time taken to complete each test was recorded in seconds. As the number of GPUs used increases, the time required to complete the test decreases. When using 8 GPUs, it took 320 seconds to complete, which is the fastest time recorded. This is for sure, but we can see that the training speed does not increase linearly with the increase in the number of GPUs. This may be because Resnet50 is a relatively small model and does not require parallel training.

PyTorch parallel training DistributedDataParallel complete code example

Using data parallelism on multiple GPUs can significantly reduce the time required to train a deep neural network (DNN) on a given dataset . As the number of GPUs increases, the time required to complete the training process decreases, indicating that DNNs can be trained more efficiently in parallel.

This approach is particularly useful when dealing with large data sets or complex DNN architectures. By leveraging multiple GPUs, the training process can be accelerated, allowing for faster model iteration and experimentation. However, it should be noted that the performance improvements achieved through Data Parallelism may be limited by factors such as communication overhead and GPU memory limitations, and require careful tuning to obtain the best results.

The above is the detailed content of PyTorch parallel training DistributedDataParallel complete code example. For more information, please follow other related articles on the PHP Chinese website!

Statement:
This article is reproduced at:51cto.com. If there is any infringement, please contact admin@php.cn delete