PyTorch Lightning: Scaling Deep Learning Workflows with Distributed Training

Anay Dongre
6 min readJun 24, 2023

PyTorch Lightning is a popular open-source framework built on top of PyTorch that aims to simplify and streamline the process of developing deep learning models.

Image from Medium

Introduction

Welcome to the second article in our series on PyTorch Lightning! In the previous article, we introduced you to PyTorch Lightning and explored its key features and benefits for simplifying the development of deep learning models. We learned how PyTorch Lightning provides a high-level abstraction for organizing and structuring PyTorch code, allowing researchers and practitioners to focus more on the model design and experimentation rather than boilerplate code.

In this article, we will delve deeper into PyTorch Lightning and explore how it enables scaling of deep learning workflows with distributed training. Distributed training is essential for training large models on massive datasets, as it allows us to leverage the power of multiple GPUs or machines to accelerate the training process. However, distributed training often comes with its own set of challenges and complexities.

Installing Pytorch Lightning & Torchvision

pip install torch torchvision pytorch-lightning

Implementation

First, we need to import the necessary modules from PyTorch and PyTorch Lightning:

import torch
from torch import nn
from torch.utils.data import DataLoader
from torchvision.datasets import CIFAR10
from torchvision import transforms

import pytorch_lightning as pl

Next, we define our neural network architecture using PyTorch’s nn.Module class. In this example, we use a simple convolutional neural network with two convolutional layers and three fully connected layers:

class Net(pl.LightningModule):
def __init__(self):
super(Net, self).__init__()

self.conv1 = nn.Conv2d(3, 6, 5)
self.pool = nn.MaxPool2d(2, 2)
self.conv2 = nn.Conv2d(6, 16, 5)
self.fc1 = nn.Linear(16 * 5 * 5, 120)
self.fc2 = nn.Linear(120, 84)
self.fc3 = nn.Linear(84, 10)

def forward(self, x):
x = self.pool(nn.functional.relu(self.conv1(x)))
x = self.pool(nn.functional.relu(self.conv2(x)))
x = torch.flatten(x, 1)
x = nn.functional.relu(self.fc1(x))
x = nn.functional.relu(self.fc2(x))
x = self.fc3(x)
return x

We then define the training and validation steps for our LightningModule. In the training_step method, we take in a batch of inputs x and labels y, pass them through our neural network to get the logits, compute the cross-entropy loss, and log the training loss using the self.log method. In the validation_step method, we do the same as the training_step but without logging the loss:

    def training_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = nn.functional.cross_entropy(logits, y)
self.log("train_loss", loss)
return loss

def validation_step(self, batch, batch_idx):
x, y = batch
logits = self(x)
loss = nn.functional.cross_entropy(logits, y)
self.log("val_loss", loss)
return loss

We also define our optimizer and learning rate scheduler in the configure_optimizers method:

    def configure_optimizers(self):
optimizer = torch.optim.Adam(self.parameters(), lr=0.001)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.1)
return [optimizer], [scheduler]

Next, we define the data loading and preprocessing steps using PyTorch’s DataLoader and transforms:

    def prepare_data(self):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

CIFAR10(root='./data', train=True, download=True, transform=transform)
CIFAR10(root='./data', train=False, download=True, transform=transform)

def train_dataloader(self):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

train_dataset = CIFAR10(root='./data', train=True, download=False, transform=transform)
return DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=8)

def val_dataloader(self):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

val_dataset = CIFAR10(root='./data', train=False, download=False, transform=transform)
return DataLoader(val_dataset, batch_size=64, shuffle=False, num_workers=8)
  1. prepare_data(self): This function is responsible for preparing the data before training the model. It first defines a series of transformations using the transforms.Compose class. The transformations include converting the data to tensors and normalizing it. After defining the transformations, the function downloads the CIFAR10 dataset for both the training and test splits. The dataset is downloaded to the './data' directory, and the specified transformations are applied to the data.
  2. train_dataloader(self): This function creates a data loader for the training dataset. It starts by defining the same transformations as in the prepare_data function. Next, it creates an instance of the CIFAR10 dataset for the training split. The dataset is loaded from the './data' directory, and the specified transformations are applied. Finally, a DataLoader object is created using the training dataset. The data loader is configured with a batch size of 64, shuffling the data, and using 8 workers for data loading. It returns the data loader.
  3. val_dataloader(self): This function creates a data loader for the validation dataset. It follows a similar structure to the train_dataloader function. It starts by defining the transformations using transforms.Compose, which are the same as in the previous functions. Then, an instance of the CIFAR10 dataset is created for the validation split. The dataset is loaded from the './data' directory, and the specified transformations are applied. Finally, a DataLoader object is created using the validation dataset. The data loader is configured with a batch size of 64, without shuffling the data, and using 8 workers for data loading. It returns the data loader.

The evaluate_model function takes a model as input and performs evaluation on a test dataset. It first applies transformations to the test data, converting it to tensors and normalizing it. Then, it creates a data loader for the test dataset. The model is moved to the appropriate device (GPU if available). The evaluation criterion is set as the cross-entropy loss.

def evaluate_model(model):
transform = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

test_dataset = CIFAR10(root='./data', train=False, download=True, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=8)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = model.to(device)

criterion = nn.CrossEntropyLoss()

model.eval()
test_loss = 0.0
correct = 0
total = 0

with torch.no_grad():
for data in test_loader:
inputs, labels = data
inputs = inputs.to(device)
labels = labels.to(device)

outputs = model(inputs)
loss = criterion(outputs, labels)
test_loss += loss.item()

_, predicted = torch.max(outputs.data, 1)
total += labels.size(0)
correct += (predicted == labels).sum().item()

accuracy = 100.0 * correct / total
average_loss = test_loss / len(test_loader)

print(f"Test Loss: {average_loss:.4f}")
print(f"Test Accuracy: {accuracy:.2f}%")

The model is put into evaluation mode, and variables for test loss, correct predictions, and total data points are initialized. Within a no-gradient context, the function iterates over the test data loader, forwarding batches of inputs through the model, calculating the loss, and accumulating the test loss. It also computes the number of correct predictions and the total number of data points. Finally, it calculates and prints the average test loss and the test accuracy.

Finally, we instantiate our Net model and the Trainer from PyTorch Lightning, specifying the desired number of GPUs or machines for distributed training:

net = Net()

trainer = pl.Trainer(

num_nodes=1, # Change to the number of machines in your distributed setup
accelerator="auto", # Distributed Data Parallel, Available names are: auto, cpu, cuda, hpu, ipu, mps, tpu.
max_epochs=5,
devices=1 # Change to the desired number of GPUs or use `None` for CPU training
)

trainer.fit(net)

evaluate_model(net)
  • num_nodes: It specifies the number of machines in a distributed setup. In this case, it is set to 1, indicating a single machine setup.
  • accelerator: It determines the accelerator type for training. The value "auto" allows PyTorch Lightning to automatically select the appropriate accelerator based on the hardware and software environment. Other possible values include "cpu", "cuda", "hpu", "ipu", "mps", and "tpu", which correspond to specific hardware accelerators.
  • max_epochs: It sets the maximum number of epochs (complete passes through the training dataset) for training the model. In this case, it is set to 5.
  • devices: It specifies the number of GPUs to use for training. Setting it to 1 indicates training with a single GPU. If you want to train on the CPU, you can set it to None.

These options allow you to control various aspects of the training process, such as distributed training, accelerator selection, and the number of epochs and devices used for training.

Once everything is set up, we simply call the fit method of the Trainer object, passing in our Net model, training dataloader, and validation dataloader.

Output

Image by author

Conclusion

PyTorch Lightning simplifies the process of scaling deep learning workflows with distributed training. By abstracting away the complexities of distributed training, PyTorch Lightning allows us to focus on designing and implementing our deep learning models without worrying about the low-level details. In this article, we walked through an example code implementation using PyTorch Lightning for distributed training. By leveraging the power of multiple GPUs or machines, we can significantly reduce the training time for large deep learning models.

References

--

--