Heim >Technologie-Peripheriegeräte >KI >Vollständiges Codebeispiel für das parallele PyTorch-Training DistributedDataParallel
Das Problem, große tiefe neuronale Netze (DNN) mithilfe großer Datensätze zu trainieren, ist eine große Herausforderung im Bereich Deep Learning. Mit zunehmender DNN- und Datensatzgröße steigen auch die Rechen- und Speicheranforderungen für das Training dieser Modelle. Dies macht es schwierig oder sogar unmöglich, diese Modelle auf einer einzelnen Maschine mit begrenzten Rechenressourcen zu trainieren. Zu den größten Herausforderungen beim Training großer DNNs mithilfe großer Datensätze gehören:
Um diesen Herausforderungen zu begegnen, wurden verschiedene Techniken entwickelt, um das Training großer DNNs mit großen Datensätzen zu erweitern, einschließlich Modellparallelität, Datenparallelität und Hybridparallelität sowie Hardware-, Software- und Algorithmusoptimierung.
In diesem Artikel demonstrieren wir Datenparallelität und Modellparallelität mit PyTorch.
Was wir Parallelität nennen, bezieht sich im Allgemeinen auf das Training tiefer neuronaler Netze (DNN) auf mehreren GPUs oder mehreren Maschinen, um weniger Trainingszeit zu erreichen. Die Grundidee der Datenparallelität besteht darin, die Trainingsdaten in kleinere Blöcke aufzuteilen und jede GPU oder Maschine einen separaten Datenblock verarbeiten zu lassen. Die Ergebnisse für jeden Knoten werden dann kombiniert und zur Aktualisierung der Modellparameter verwendet. Bei der Datenparallelität ist die Modellarchitektur auf jedem Knoten gleich, die Modellparameter werden jedoch zwischen den Knoten aufgeteilt. Jeder Knoten trainiert sein eigenes lokales Modell mithilfe zugewiesener Datenblöcke, und am Ende jeder Trainingsiteration werden die Modellparameter über alle Knoten hinweg synchronisiert. Dieser Vorgang wird wiederholt, bis das Modell zu einem zufriedenstellenden Ergebnis konvergiert.
Im Folgenden verwenden wir die ResNet50- und CIFAR10-Datensätze für ein vollständiges Codebeispiel:
Bei der Datenparallelität bleibt die Modellarchitektur auf jedem Knoten gleich, aber die Modellparameter werden zwischen Knoten aufgeteilt, und jeder Knoten verwendet Datenblöcke, um Datenblöcke zuzuweisen Trainieren Sie Ihr eigenes lokales Modell.
Die DistributedDataParallel-Bibliothek von PyTorch kann Verläufe und Modellparameter effizient über Knoten hinweg kommunizieren und synchronisieren, um ein verteiltes Training zu erreichen. Dieser Artikel enthält Beispiele für die Implementierung von Datenparallelität mit PyTorch unter Verwendung der ResNet50- und CIFAR10-Datensätze, wobei der Code auf mehreren GPUs oder Maschinen ausgeführt wird, wobei jede Maschine eine Teilmenge der Trainingsdaten verarbeitet. Der Trainingsprozess wird mithilfe der DistributedDataParallel-Bibliothek von PyTorch parallelisiert.
import os from datetime import datetime from time import time import argparse import torchvision import torchvision.transforms as transforms import torch import torch.nn as nn import torch.distributed as dist from torch.nn.parallel import DistributedDataParallel
Als nächstes überprüfen wir die GPU.
import subprocess result = subprocess.run(['nvidia-smi'], stdout=subprocess.PIPE) print(result.stdout.decode())
Da wir auf mehreren Servern laufen müssen, ist es nicht praktikabel, sie einzeln manuell auszuführen, daher ist ein Planer erforderlich. Hier verwenden wir SLURM-Dateien, um den Code auszuführen (slurmkostenloser und Open-Source-Jobplaner für Linux- und Unix-ähnliche Kernel),
def main(): # get distributed configuration from Slurm environment parser = argparse.ArgumentParser() parser.add_argument('-b', '--batch-size', default=128, type =int, help='batch size. it will be divided in mini-batch for each worker') parser.add_argument('-e','--epochs', default=2, type=int, metavar='N', help='number of total epochs to run') parser.add_argument('-c','--checkpoint', default=None, type=str, help='path to checkpoint to load') args = parser.parse_args() rank = int(os.environ['SLURM_PROCID']) local_rank = int(os.environ['SLURM_LOCALID']) size = int(os.environ['SLURM_NTASKS']) master_addr = os.environ["SLURM_SRUN_COMM_HOST"] port = "29500" node_id = os.environ['SLURM_NODEID'] ddp_arg = [rank, local_rank, size, master_addr, port, node_id] train(args, ddp_arg)
Dann verwenden wir die DistributedDataParallel-Bibliothek, um verteiltes Training durchzuführen.
def train(args, ddp_arg): rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg # display info if rank == 0: #print(">>> Training on ", len(hostnames), " nodes and ", size, " processes, master node is ", MASTER_ADDR) print(">>> Training on ", size, " GPUs, master node is ", MASTER_ADDR) #print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID)) print("- Process {} corresponds to GPU {} of node {}".format(rank, local_rank, NODE_ID)) # configure distribution method: define address and port of the master node and initialise communication backend (NCCL) #dist.init_process_group(backend='nccl', init_method='env://', world_size=size, rank=rank) dist.init_process_group( backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank ) # distribute model torch.cuda.set_device(local_rank) gpu = torch.device("cuda") #model = ResNet18(classes=10).to(gpu) model = torchvision.models.resnet50(pretrained=False).to(gpu) ddp_model = DistributedDataParallel(model, device_ids=[local_rank]) if args.checkpoint is not None: map_location = {'cuda:%d' % 0: 'cuda:%d' % local_rank} ddp_model.load_state_dict(torch.load(args.checkpoint, map_location=map_location)) # distribute batch size (mini-batch) batch_size = args.batch_size batch_size_per_gpu = batch_size // size # define loss function (criterion) and optimizer criterion = nn.CrossEntropyLoss() optimizer = torch.optim.SGD(ddp_model.parameters(), 1e-4) transform_train = transforms.Compose([ transforms.RandomCrop(32, padding=4), transforms.RandomHorizontalFlip(), transforms.ToTensor(), transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)), ]) # load data with distributed sampler #train_dataset = torchvision.datasets.CIFAR10(root='./data', # train=True, # transform=transform_train, # download=False) # load data with distributed sampler train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, transform=transform_train, download=False) train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset, num_replicas=size, rank=rank) train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=batch_size_per_gpu, shuffle=False, num_workers=0, pin_memory=True, sampler=train_sampler) # training (timers and display handled by process 0) if rank == 0: start = datetime.now() total_step = len(train_loader) for epoch in range(args.epochs): if rank == 0: start_dataload = time() for i, (images, labels) in enumerate(train_loader): # distribution of images and labels to all GPUs images = images.to(gpu, non_blocking=True) labels = labels.to(gpu, non_blocking=True) if rank == 0: stop_dataload = time() if rank == 0: start_training = time() # forward pass outputs = ddp_model(images) loss = criterion(outputs, labels) # backward and optimize optimizer.zero_grad() loss.backward() optimizer.step() if rank == 0: stop_training = time() if (i + 1) % 10 == 0 and rank == 0: print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}, Time data load: {:.3f}ms, Time training: {:.3f}ms'.format(epoch + 1, args.epochs, i + 1, total_step, loss.item(), (stop_dataload - start_dataload)*1000, (stop_training - start_training)*1000)) if rank == 0: start_dataload = time() #Save checkpoint at every end of epoch if rank == 0: torch.save(ddp_model.state_dict(), './checkpoint/{}GPU_{}epoch.checkpoint'.format(size, epoch+1)) if rank == 0: print(">>> Training complete in: " + str(datetime.now() - start)) if __name__ == '__main__': main()
Der Code teilt die Daten und das Modell auf mehrere GPUs auf und aktualisiert das Modell auf verteilte Weise. Hier sind einige Erläuterungen zum Code:
train(args, ddp_arg) hat zwei Parameter, args und ddp_arg, wobei args der an das Skript übergebene Befehlszeilenparameter ist und ddp_arg Parameter für das verteilte Training enthält.
rank, local_rank, size, MASTER_ADDR, port, NODE_ID = ddp_arg: Entpacken Sie die Parameter für das verteilte Training in ddp_arg.
Wenn der Rang 0 ist, drucken Sie die Anzahl der aktuell verwendeten GPUs und die IP-Adressinformationen des Masterknotens aus.
dist.init_process_group(backend='nccl', init_method='tcp://{}:{}'.format(MASTER_ADDR, port), world_size=size, rank=rank) : Verteilten Prozess mithilfe der NCCL-Backend-Gruppe initialisieren.
torch.cuda.set_device(local_rank): Wählen Sie die angegebene GPU für diesen Prozess aus.
model = Torchvision.models. ResNet50 (pretrained=False).to(gpu): Laden Sie das ResNet50-Modell vom Torchvision-Modell und verschieben Sie es auf die angegebene GPU.
ddp_model = DistributedDataParallel(model, device_ids=[local_rank]): Wickeln Sie das Modell in das DistributedDataParallel-Modul ein, was bedeutet, dass wir verteiltes Training durchführen können
Laden Sie den CIFAR-10-Datensatz und wenden Sie die Datenverbesserungstransformation an.
train_sampler=torch.utils.data.distributed.DistributedSampler(train_dataset,num_replicas=size,rank=rank): Erstellen Sie ein DistributedSampler-Objekt, um den Datensatz auf mehrere GPUs aufzuteilen.
train_loader =torch.utils.data.DataLoader(dataset=train_dataset,batch_size=batch_size_per_gpu,shuffle=False,num_workers=0,pin_memory=True,sampler=train_sampler): Erstellen Sie ein DataLoader-Objekt und die Daten werden in das geladen Modell in Stapeln Dies entspricht unseren üblichen Trainingsschritten, mit der Ausnahme, dass ein DistributedSampler für die verteilte Datenabtastung hinzugefügt wird.
Trainieren Sie das Modell für die angegebene Anzahl von Epochen und aktualisieren Sie die Gewichte mithilfe von „optimierer.step()“ auf verteilte Weise.
Rang0 speichert am Ende jeder Runde einen Kontrollpunkt.
Rang0 zeigt Verlust und Trainingszeit alle 10 Chargen an.
Am Ende des Trainings liegt die Gesamtzeit, die für das Drucken des Trainingsmodells aufgewendet wurde, ebenfalls auf Rang 0.
wurde mit 1 Knoten mit 1/2/3/4 GPUs, 2 Knoten mit 6/8 GPUs und jedem Knoten mit 3/4 GPUs trainiert. Der Test von Resnet50 auf Cifar10 ist wie unten gezeigt , die Chargengröße bleibt für jeden Test gleich. Die für die Durchführung jedes Tests benötigte Zeit wurde in Sekunden aufgezeichnet. Mit zunehmender Anzahl der verwendeten GPUs nimmt die für die Durchführung des Tests erforderliche Zeit ab. Bei Verwendung von 8 GPUs dauerte der Vorgang 320 Sekunden, was die schnellste aufgezeichnete Zeit ist. Dies ist sicher, aber wir können sehen, dass die Trainingsgeschwindigkeit nicht linear mit der Erhöhung der Anzahl der GPUs zunimmt. Dies kann daran liegen, dass Resnet50 ein relativ kleines Modell ist und kein paralleles Training erfordert.
Durch die Verwendung von Datenparallelität auf mehreren GPUs kann die Zeit, die zum Trainieren eines tiefen neuronalen Netzwerks (DNN) für einen bestimmten Datensatz erforderlich ist, erheblich verkürzt werden. Mit zunehmender Anzahl an GPUs verringert sich die für den Abschluss des Trainingsprozesses erforderliche Zeit, was darauf hindeutet, dass DNNs parallel effizienter trainiert werden können.
Dieser Ansatz ist besonders nützlich, wenn es um große Datensätze oder komplexe DNN-Architekturen geht. Durch die Nutzung mehrerer GPUs kann der Trainingsprozess beschleunigt werden, was eine schnellere Modelliteration und Experimente ermöglicht. Es ist jedoch zu beachten, dass die durch Datenparallelität erzielte Leistungsverbesserung durch Faktoren wie Kommunikationsaufwand und GPU-Speicherbeschränkungen begrenzt sein kann und eine sorgfältige Abstimmung erfordert, um die besten Ergebnisse zu erzielen.
Das obige ist der detaillierte Inhalt vonVollständiges Codebeispiel für das parallele PyTorch-Training DistributedDataParallel. Für weitere Informationen folgen Sie bitte anderen verwandten Artikeln auf der PHP chinesischen Website!