>  기사  >  기술 주변기기  >  PyTorch 병렬 훈련 DistributedDataParallel 전체 코드 예제

PyTorch 병렬 훈련 DistributedDataParallel 전체 코드 예제

WBOY
WBOY앞으로
2023-04-10 20:51:011268검색

대규모 데이터 세트를 사용하여 대규모 심층 신경망(DNN)을 훈련시키는 문제는 딥 러닝 분야의 주요 과제입니다. DNN 및 데이터 세트 크기가 증가함에 따라 이러한 모델을 교육하는 데 필요한 계산 및 메모리 요구 사항도 늘어납니다. 이로 인해 컴퓨팅 리소스가 제한된 단일 시스템에서 이러한 모델을 교육하는 것이 어렵거나 불가능해집니다. 대규모 데이터 세트를 사용하여 대규모 DNN을 교육하는 데 따른 몇 가지 주요 과제는 다음과 같습니다.

  • 긴 교육 시간: 모델의 복잡성과 데이터 세트의 크기에 따라 교육 프로세스를 완료하는 데 몇 주 또는 몇 달이 걸릴 수 있습니다.
  • 메모리 제한: 대규모 DNN은 훈련 중에 모든 모델 매개변수, 기울기 및 중간 활성화를 저장하기 위해 많은 양의 메모리가 필요할 수 있습니다. 이로 인해 메모리 부족 오류가 발생하고 단일 머신에서 학습할 수 있는 모델 크기가 제한될 수 있습니다.

이러한 문제를 해결하기 위해 모델 병렬 처리, 데이터 병렬 처리, 하이브리드 병렬 처리는 물론 하드웨어, 소프트웨어, 알고리즘 최적화를 포함하여 대규모 데이터 세트가 포함된 대규모 DNN의 교육을 확장하기 위한 다양한 기술이 개발되었습니다.

이 기사에서는 PyTorch를 사용하여 데이터 병렬성과 모델 병렬성을 보여줍니다.

PyTorch 병렬 훈련 DistributedDataParallel 전체 코드 예제

우리가 병렬성이라고 부르는 것은 일반적으로 훈련 시간을 단축하기 위해 여러 GPU 또는 여러 기계에서 심층 신경망(dnn)을 훈련하는 것을 의미합니다. 데이터 병렬화의 기본 아이디어는 훈련 데이터를 더 작은 덩어리로 분할하고 각 GPU 또는 기계가 별도의 데이터 덩어리를 처리하도록 하는 것입니다. 그런 다음 각 노드의 결과를 결합하여 모델 매개변수를 업데이트하는 데 사용합니다. 데이터 병렬 처리에서 모델 아키텍처는 각 노드에서 동일하지만 모델 매개변수는 노드 간에 분할됩니다. 각 노드는 할당된 데이터 청크를 사용하여 자체 로컬 모델을 훈련하고, 각 훈련 반복이 끝나면 모델 매개변수가 모든 노드에서 동기화됩니다. 이 과정은 모델이 만족스러운 결과로 수렴될 때까지 반복됩니다.

아래에서는 전체 코드 예제를 위해 ResNet50 및 CIFAR10 데이터 세트를 사용합니다.

데이터 병렬 처리에서 모델 아키텍처는 각 노드에서 동일하게 유지되지만 모델 매개변수는 노드 간에 분할되며 각 노드는 할당 데이터 청크를 사용하여 교육합니다. 자신의 지역 모델.

PyTorch의 DistributedDataParallel 라이브러리는 노드 전반에 걸쳐 기울기와 모델 매개변수를 효율적으로 전달하고 동기화하여 분산 교육을 달성할 수 있습니다. 이 기사에서는 ResNet50 및 CIFAR10 데이터 세트를 사용하여 PyTorch로 데이터 병렬 처리를 구현하는 방법에 대한 예를 제공합니다. 여기서 코드는 여러 GPU 또는 머신에서 실행되며 각 머신은 훈련 데이터의 하위 집합을 처리합니다. 훈련 프로세스는 PyTorch의 DistributedDataParallel 라이브러리를 사용하여 병렬화됩니다.

필요한 라이브러리를 가져옵니다

import os
 from datetime import datetime
 from time import time
 import argparse
 import torchvision
 import torchvision.transforms as transforms
 import torch
 import torch.nn as nn
 import torch.distributed as dist
 from torch.nn.parallel import DistributedDataParallel

다음으로 GPU를 확인하겠습니다.

import subprocess
 result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE)
 print(result.stdout.decode())

여러 서버에서 실행해야 하기 때문에 수동으로 하나씩 실행하는 것은 실용적이지 않으므로 스케줄러가 필요합니다. 여기서는 SLURM 파일을 사용하여 코드를 실행합니다(slurmLinux 및 Unix 계열 커널용 무료 및 오픈 소스 작업 스케줄러).

def main():
 
 # get distributed configuration from Slurm environment
 
 parser = argparse.ArgumentParser()
 parser.add_argument('-b', '--batch-size', default=128, type =int,
 help='batch size. it will be divided in mini-batch for each worker')
 parser.add_argument('-e','--epochs', default=2, type=int, metavar='N',
 help='number of total epochs to run')
 parser.add_argument('-c','--checkpoint', default=None, type=str,
 help='path to checkpoint to load')
 args = parser.parse_args()
 
 rank = int(os.environ['SLURM_PROCID'])
 local_rank = int(os.environ['SLURM_LOCALID'])
 size = int(os.environ['SLURM_NTASKS'])
 master_addr = os.environ["SLURM_SRUN_COMM_HOST"]
 port = "29500"
 node_id = os.environ['SLURM_NODEID']
 ddp_arg = [rank, local_rank, size, master_addr, port, node_id]
 train(args, ddp_arg)

그런 다음 DistributedDataParallel 라이브러리를 사용하여 분산 교육을 수행합니다.

def train(args, ddp_arg):
 
 rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg
 
 # display info
 if rank == 0:
 #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR)
 print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR)
 #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID))
 
 
 # configure distribution method: define address and port of the master node and initialise communication backend (NCCL)
 #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank)
 dist.init_process_group(
 backend='nccl',
 init_method='tcp://{}:{}'.format(MASTER_ADDR, port),
 world_size=size,
 rank=rank
)
 
 # distribute model
 torch.cuda.set_device(local_rank)
 gpu = torch.device("cuda")
 #model = ResNet18(classes=10).to(gpu)
 model = torchvision.models.resnet50(pretrained=False).to(gpu)
 ddp_model = DistributedDataParallel(model, device_ids=[local_rank])
 if args.checkpoint is not None:
 map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank}
 ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location))
 
 # distribute batch size (mini-batch)
 batch_size = args.batch_size
 batch_size_per_gpu = batch_size // size
 
 # define loss function (criterion) and optimizer
 criterion = nn.CrossEntropyLoss()
 optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4)
 
 
 transform_train = transforms.Compose([
 transforms.RandomCrop(32, padding=4),
 transforms.RandomHorizontalFlip(),
 transforms.ToTensor(),
 transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
 
 # load data with distributed sampler
 #train_dataset = torchvision.datasets.CIFAR10(root='./data',
 # train=True,
 # transform=transform_train,
 # download=False)
 
 # load data with distributed sampler
 train_dataset = torchvision.datasets.CIFAR10(root='./data',
train=True,
transform=transform_train,
download=False)
 
 train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset,
 num_replicas=size,
 rank=rank)
 
 train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
batch_size=batch_size_per_gpu,
shuffle=False,
num_workers=0,
pin_memory=True,
sampler=train_sampler)
 
 # training (timers and display handled by process 0)
 if rank == 0: start = datetime.now()
 total_step = len(train_loader)
 
 for epoch in range(args.epochs):
 if rank == 0: start_dataload = time()
 
 for i, (images, labels) in enumerate(train_loader):
 
 # distribution of images and labels to all GPUs
 images = images.to(gpu, non_blocking=True)
 labels = labels.to(gpu, non_blocking=True)
 
 if rank == 0: stop_dataload = time()
 
 if rank == 0: start_training = time()
 
 # forward pass
 outputs = ddp_model(images)
 loss = criterion(outputs, labels)
 
 # backward and optimize
 optimizer.zero_grad()
 loss.backward()
 optimizer.step()
 
 if rank == 0: stop_training = time()
 if (i + 1) % 10 == 0 and rank == 0:
 print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs,
 i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000,
(stop_training - start_training)*1000))
 if rank == 0: start_dataload = time()
 
 #Save checkpoint at every end of epoch
 if rank == 0:
 torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1))
 
 if rank == 0:
 print(">>> Training complete in: " + str(datetime.now() - start))
 
 
 if __name__ == '__main__':
 
 main()

이 코드는 데이터와 모델을 여러 GPU에 분할하고 분산 방식으로 모델을 업데이트합니다. 다음은 코드에 대한 몇 가지 설명입니다.

train(args, ddp_arg)에는 args와 ddp_arg라는 두 개의 매개변수가 있습니다. 여기서 args는 스크립트에 전달된 명령줄 매개변수이고 ddp_arg에는 분산 훈련 관련 매개변수가 포함되어 있습니다.

rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg: ddp_arg에 분산 훈련 관련 매개변수의 압축을 풉니다.

순위가 0이면 현재 사용 중인 GPU 수와 마스터 노드 IP 주소 정보를 출력합니다.

dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, Rank=rank): NCCL 백엔드 그룹을 사용하여 분산 프로세스를 초기화합니다.

torch.cuda.set_device(local_rank): 이 프로세스에 대해 지정된 GPU를 선택합니다.

model = torchvision.models.ResNet50 (pretrained=False).to(gpu): torchvision 모델에서 ResNet50 모델을 로드하고 지정된 GPU로 이동합니다.

ddp_model = DistributedDataParallel(model, device_ids=[local_rank]): DistributedDataParallel 모듈에 모델을 래핑합니다. 이는 분산 교육을 수행할 수 있음을 의미합니다.

CIFAR-10 데이터 세트를 로드하고 데이터 향상 변환을 적용합니다.

train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank): DistributedSampler 객체를 생성하여 데이터 세트를 여러 GPU로 분할합니다.

train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler): DataLoader 객체를 생성하면 데이터가 이는 분산 데이터 샘플링 DistributedSampler가 추가된다는 점을 제외하면 일반적인 교육 단계와 일치합니다.

지정된 에포크 수에 대해 모델을 훈련하고 분산 방식으로 Optimizer.step()을 사용하여 가중치를 업데이트합니다.

rank0은 각 라운드가 끝날 때마다 체크포인트를 저장합니다.

rank0은 10개의 배치마다 손실과 훈련 시간을 보여줍니다.

훈련이 끝나면 훈련 모델을 인쇄하는 데 소요된 총 시간도 순위 0입니다.

코드 테스트

는 1/2/3/4 GPU가 있는 1개의 노드, 6/8 GPU가 있는 2개의 노드, 그리고 3/4 GPU가 있는 각 노드를 사용하여 훈련되었습니다. Cifar10에서 Resnet50의 테스트는 다음과 같습니다. , 배치 크기는 각 테스트마다 동일하게 유지됩니다. 각 테스트를 완료하는 데 걸린 시간은 초 단위로 기록되었습니다. 사용되는 GPU 수가 증가할수록 테스트를 완료하는 데 필요한 시간이 감소합니다. 8개의 GPU를 사용했을 때 완료하는데 320초가 걸렸는데, 이는 기록된 가장 빠른 시간이다. 이는 확실하지만 GPU 수의 증가에 따라 훈련 속도가 선형적으로 증가하지 않는다는 것을 알 수 있습니다. 이는 Resnet50이 상대적으로 작은 모델이고 병렬 훈련이 필요하지 않기 때문일 수 있습니다.

PyTorch 병렬 훈련 DistributedDataParallel 전체 코드 예제

여러 GPU에서 데이터 병렬성을 사용하면 특정 데이터 세트에서 심층 신경망(DNN)을 훈련하는 데 필요한 시간을 크게 줄일 수 있습니다. GPU 수가 증가하면 훈련 프로세스를 완료하는 데 필요한 시간이 감소하여 DNN을 병렬로 보다 효율적으로 훈련할 수 있음을 나타냅니다.

이 접근 방식은 대규모 데이터 세트나 복잡한 DNN 아키텍처를 처리할 때 특히 유용합니다. 여러 GPU를 활용하면 훈련 프로세스가 가속화되어 더 빠른 모델 반복 및 실험이 가능해집니다. 그러나 데이터 병렬성을 통해 달성되는 성능 향상은 통신 오버헤드 및 GPU 메모리 제한과 같은 요인으로 인해 제한될 수 있으며 최상의 결과를 얻으려면 세심한 조정이 필요하다는 점에 유의해야 합니다.

위 내용은 PyTorch 병렬 훈련 DistributedDataParallel 전체 코드 예제의 상세 내용입니다. 자세한 내용은 PHP 중국어 웹사이트의 기타 관련 기사를 참조하세요!

성명:
이 기사는 51cto.com에서 복제됩니다. 침해가 있는 경우 admin@php.cn으로 문의하시기 바랍니다. 삭제