Data loading code can get messy and hard to maintain. We ideally want our data loading code to be decoupled from our model training code for better readability and modularity.

PyTorch 2.0 provides two data primitives: torchdata.datapipes and torchdata.dataloader2.DataLoader2 that allow you to use pre-loaded datasets as well as your own data.

DataPipe is composable Iterable-style and Map-style building blocks that work well out of the box with the PyTorch 2.0 DataLoader2.These built-in DataPipes have the necessary functionalities, namely loading files, parsing, caching, transforming, filtering, and many more utilities. You can find the full list of built-in IterDataPipes here and MapDataPipes here.

Currently, PyTorch 2.0 already has a large number of built-in DataPipes and covers most necessary data processing operations. If none of them supports your need, you can create your own custom DataPipe.

Dataset

Here is an example of how to load the Dog Breed Identification dataset from Kaggle. It contains a training set and a test set of images of dogs. Each image has a filename that is its unique id. The dataset comprises 120 breeds of dogs.

Create DataPipe From Pandas DataFrame

For this example, we will first generate label ids using sklearn ‘s LabelEncoder.Next, we will build our DataPipes to read and parse through the Pandas DataFrame.

from sklearn.preprocessing import LabelEncoder
import pandas as pd

labels = pd.read_csv('/content/labels.csv')
IMAGE_ROOT='/content/train'

print('Training set: {}'.format(labels.shape))

# Encode the breed into digits
labels['label'] = LabelEncoder().fit_transform(labels.breed)

# Create a breed-2-index dictionary
dict_df = labels[['label','breed']].copy()
dict_df.drop_duplicates(inplace=True)
dict_df.set_index('label',drop=True,inplace=True)

index_to_breed = dict_df.to_dict()['breed']

Wraps a Pandas Dataframe to create an IterDataPipe.

from torchdata.datapipes.iter import IterDataPipe,IterableWrapper

from torchvision import transforms

import matplotlib.pyplot as plt
import numpy as np
from PIL import Image

labels.head()

datapipe = IterableWrapper(labels.to_numpy())

The labels.csv file looks like this:

Load Pandas DataFrame in DataPipe

Creating a Custom DataPipe

All DataPipes that represent an iterable of data samples should subclass torchdata.datapipes.iter.IterDataPipe. This style of DataPipes is particularly useful when data come from a stream, or when the number of samples is too large to fit them all in memory. IterDataPipe is lazily initialized and its elements are computed only when next() is called on the iterator of an IterDataPipe.

As a guiding example, let us implement an IterDataPipe that applies a callable to the input iterator. A custom DataPipe class must implement three functions: __init__, __len__, and __iter__. Take a look at this implementation; the Dog Breed Identification images are stored in a directory img_dir, and their labels are stored separately in a CSV file labels.csv.

In the next sections, we’ll break down what’s happening in each of these functions.

__init__

The __init__ function is run once when instantiating the DataPipe object. We initialize the directory containing the images, source Datapipe, and both transforms.

class ImageLoadererIterDataPipe(IterDataPipe):
    def __init__(self, source_dp: IterDataPipe,transform,root_dir) -> None:
        super().__init__()
        
        self.source_dp = source_dp
        self.root_dir=root_dir
        self.transform = transform
   . . . . . . . .

DataSets are now generally constructed as stacks of DataPipes, so each DataPipe typically takes a source DataPipe as its first argument. Avoid loading data from the source DataPipe in __init__ function, in order to support lazy data loading and save memory.

__iter__

All subclasses should overwrite __iter__(), which would return an iterator of samples in this DataPipe. Calling __iter__ of an IterDataPipe automatically invokes its method reset(), which by default performs no operation.

For IterDataPipes, an __iter__ function is needed to consume data from the source IterDataPipe and then apply the operation over the data before yield. Based on the index, it identifies the image’s location on disk, converts that to a tensor using PIL Imag.open(), retrieves the corresponding label from the CSV, calls the transform functions on them, and returns the tensor image and corresponding label..

 . . . .  . . .    
def __iter__(self):
        for row in self.source_dp:
            
            label = row[2]
            
            img_path=os.path.join(self.root_dir, row[0]+'.jpg',)
            image = Image.open(img_path)
            
            image = self.transform(image)
            
            yield image,label
 . . . . . . .

__len__

The __len__ method of a DataPipe returns the length of the source DataPipe.However, note that __len__ is optional for IterDataPipe and often inadvisable.

    def __len__(self):
        return len(self.source_dp)

These DataPipes can be invoked in two ways, using the class constructor or applying their functional form onto an existing IterDataPipe. You can chain multiple IterDataPipe together to form a pipeline that will perform multiple operations in succession.

Lastly, we will put everything together as each DataPipe is essentially a container to apply an operation to data yielded from a source DataPipe.

class ImageLoadererIterDataPipe(IterDataPipe):
    def __init__(self, source_dp: IterDataPipe,transform,root_dir) -> None:
        super().__init__()
        
        self.source_dp = source_dp
        self.root_dir=root_dir
        self.transform = transform

    def __iter__(self):
        for row in self.source_dp:
            
            label = row[2]
            
            img_path=os.path.join(self.root_dir, row[0]+'.jpg',)
            image = Image.open(img_path)
            
            image = self.transform(image)
            
            yield image,label
    
    def __len__(self):
        return len(self.source_dp)


datapipe = IterableWrapper(labels.to_numpy())
datapipe = datapipe.shuffle()
datapipe = ImageLoadererIterDataPipe(datapipe,train_transform,IMAGE_ROOT)

datapipe = datapipe.batch(8)

Iterating and Visualizing the Dataset

We can index Datasets manually like a list: datapipe[index]. We use matplotlib to visualize some samples in our training data.

figure = plt.figure(figsize=(8, 4))
cols, rows = 4, 2

for input in datapipe:
  for i in range(0, cols * rows):
  
    label = input[i][1]
    img = input[i][0]

    inp = img.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    
    figure.add_subplot(rows, cols, i+1)
    
    plt.title(index_to_breed[label])
    plt.axis("off")
    plt.imshow(inp)
  break

plt.show()

When called iter(datapipe), could return a stream of data reading from a database.

DataPipe Custom IterableDataset

Related Post

Run this code in Google Colab