Pytorch使用

数据集和加载器

Posted by Welt Xing on September 2, 2021

在学习torch的过程中,笔者发现神经网络结构的创建并不难:定义一个类,在构造函数中定义神经网络层信息,然后按层的顺序,补全前向传播的forward方法即可,就像下面:

import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(4, 5)
        self.fc2 = nn.Linear(5, 3)
 
    def forward(self, x):
        x = self.fc1(x)
        x = F.relu(x)
        x = self.fc2(x)
        x = F.relu(x)
        return x

便定义好了一个简单的单隐层神经网络。难的是数据接口的构建,之前总是被pytorch中数据集的设置弄得头晕,这里进行语法上的探究,为以后的工作提供参考。

必要性

严格意义上来说,train loader并不是必要的,以sklearn中的鸢尾花数据集为例,这是一个三分类问题,我们加载特征集和标签集:

from sklearn.datasets import load_iris
X, y = load_iris(return_X_y=True)

这里的Xy分别是150*4和150*1的表格数据,由于数据集很小,直接用整个训练集作前馈:

net = Net()
output = net(X)

然后进行反向传播,梯度下降等一系列操作。但对于更大的数据,比如1000条数据,我们常常会使用mini-batch梯度下降,以提高梯度下降的效率,这里就需要我们自己来定义好数据处理接口。

自定义数据集和加载器

自定义数据集需要继承抽象类torch.utils.data.Dataset,并重载__len____getitem__两个函数:

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data_src):
        super(MyDataset, self).__init__()
        self.data_src = data_src

    def __getitem__(self, index):
        return self.data_src[index]

    def __len__(self):
        return len(self.data_src)

实例化这个类,再将其放入DataLoader数据加载器:

X, y = load_iris(return_X_y=True)
dataset = MyDataset(X)

train_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=10,
    shuffle=True,
)

for x in train_loader:
    print(x)

我们规定了数据集的batch_size是10,对于150条鸢尾花数据集,上面的代码输出15条下面这样的张量:

tensor([[6.7000, 3.1000, 5.6000, 2.4000],
        [6.9000, 3.1000, 5.1000, 2.3000],
        [5.8000, 2.7000, 5.1000, 1.9000],
        [6.8000, 3.2000, 5.9000, 2.3000],
        [6.7000, 3.3000, 5.7000, 2.5000],
        [6.7000, 3.0000, 5.2000, 2.3000],
        [6.3000, 2.5000, 5.0000, 1.9000],
        [6.5000, 3.0000, 5.2000, 2.0000],
        [6.2000, 3.4000, 5.4000, 2.3000],
        [5.9000, 3.0000, 5.1000, 1.8000]], dtype=torch.float64)

但这样的数据集并不是我们想要的,我们通常需要同时加载数据和对应标签,因此我们采用下面的方法定义Dataset:

class MyDataset(torch.utils.data.Dataset):
    def __init__(self, data, label):
        super(MyDataset, self).__init__()
        self.data, self.label = data, label

    def __getitem__(self, index):
        # 这里同时返回数据与标签
        return self.data[index], self.label[index]

    def __len__(self):
        return len(self.data)

类似的,我们实例化该类并放入加载器:

X, y = load_iris(return_X_y=True)
dataset = MyDataset(X, y)

train_loader = torch.utils.data.DataLoader(
    dataset,
    batch_size=5,
    shuffle=False,
)

for x, y in train_loader:
    print(x, y)

我们定义的batch_size为5,因此输出的是30个下面这样的张量对:

tensor([[4.6000, 3.2000, 1.4000, 0.2000],
        [4.9000, 3.1000, 1.5000, 0.1000],
        [5.5000, 2.5000, 4.0000, 1.3000],
        [5.0000, 3.5000, 1.6000, 0.6000],
        [5.0000, 3.2000, 1.2000, 0.2000]], dtype=torch.float64) tensor([0, 0, 1, 0, 0], dtype=torch.int32)

实际应用

在掌握了数据集和加载器的用法之后,我们就不需要在一个批次中将所有样本都投入训练:

# loop over the dataset multiple times
for epoch in range(5):
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print('Loss: {}'.format(running_loss))

print('Finished Training')

在实际操作过程中,读取鸢尾花数据集后需要进行类型转换,比如:

train_X, test_X, train_y, test_y = train_test_split(
    X.astype('float32'), # 默认是float64
    y.astype('int64'), # 默认是int32
    train_size=0.7,
)

不然在数据经过神经网络时会报错。

规范流程

下面算是一个完整的训练测试流程:

import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.datasets import load_iris
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score


class IrisDataset(Dataset):
    def __init__(self, X, y):
        super(IrisDataset, self).__init__()
        self.X, self.y = X, y

    def __getitem__(self, index):
        return self.X[index], self.y[index]

    def __len__(self):
        return len(self.X)


X, y = load_iris(return_X_y=True)

train_X, test_X, train_y, test_y = train_test_split(
    X.astype('float32'),
    y.astype('int64'),
    train_size=0.7,
)

train_loader = DataLoader(
    IrisDataset(train_X, train_y),
    batch_size=4,
    shuffle=True,
)
# test_loader设计有点赘余,因为测试集不需要分批次,这里是为了操作规范
test_loader = DataLoader(
    IrisDataset(test_X, test_y),
    batch_size=100,
)


class IrisNet(nn.Module):
    def __init__(self):
        super(IrisNet, self).__init__()
        self.fc1 = nn.Linear(4, 10)
        self.fc2 = nn.Linear(10, 3)

    def forward(self, x):
        x = self.fc1(x)
        x = F.elu(x)
        x = self.fc2(x)
        x = F.elu(x)
        return x


net = IrisNet()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-3)
criterion = torch.nn.CrossEntropyLoss()

# loop over the dataset multiple times
for epoch in range(50):
    running_loss = 0.0
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data

        # zero the parameter gradients
        optimizer.zero_grad()

        # forward + backward + optimize
        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print('Loss: {}'.format(running_loss))

print('Finished Training')

for x, y in test_loader:
    output = net(x)
    pred = torch.argmax(output, axis=1)
    print(accuracy_score(y, pred))