CNN

Tao Zou

2024-02-06

Image Data Processing

Show, save and load a gray image and a color image from <np.ndarray>.

import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

gray_image_array = np.random.randint(0, 256, size=(100, 100), dtype=np.uint8)
color_image_array = np.random.randint(0, 256, size=(100, 100, 3), dtype=np.uint8)

# Show gray and color images
plt.subplot(1, 2, 1)
plt.axis('off')
plt.imshow(gray_image_array, cmap='gray')
plt.subplot(1, 2, 2)
plt.axis('off')
plt.imshow(color_image_array)
plt.show()

# Save and load images
image = Image.fromarray(gray_image_array)
image.save('gray_image.png')

image = Image.fromarray(color_image_array)
image.save('color_image.png')

# Load images
loaded_gray_image = Image.open('gray_image.png') 
loaded_gray_image_array = np.array(loaded_gray_image)
print(np.array_equal(loaded_gray_image_array, gray_image_array))

loaded_color_image = Image.open('color_image.png') 
loaded_color_image_array = np.array(loaded_color_image)
print(np.array_equal(loaded_color_image_array, loaded_color_image))

feature_transform defined above is a common transformer for <PIL Image>.

from torchvision import transforms

features_transform = transforms.Compose([
        transforms.Resize((256, 256)),  # resize the shape of every image
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])])

transforms.Resize() only receives a <PIL Image>. This function should always be placed at the beginning.

transform.ToTensor()’s input format is shape (height, width, channel), and the output format is shape (channel, height, width). What’s more, if the input data type is <PIL Image> or <np.ndarray> with a dtype of <np.uint8>, its values will be limited to a range of [0, 1].

transforms.Normalize() will be applied to every channel.

LeNet

LeNet structure code

# The input image's shape is (num_channels: 1, height: 28, width: 28)
# The ouput classes number is 10
import torch
from torch import nn

model = nn.Sequential(nn.Conv2d(1, 6, kernel_size=5, padding=2), nn.Sigmoid(),
                    nn.AvgPool2d(kernel_size=2, stride=2),
                    nn.Conv2d(6, 16, kernel_size=5), nn.Sigmoid(),
                    nn.AvgPool2d(kernel_size=2, stride=2),
                    nn.Flatten(),
                    nn.Linear(16 * 5 * 5, 120), nn.Sigmoid(),
                    nn.Linear(120, 84), nn.Sigmoid(),
                    nn.Linear(84, 10))

nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, dilation, groups, bias=True, padding_mode='zeros')

  • in_channels: represent the number of feature images(channels) in the input layer;

  • out_channels: represents the number of channels in the output layer, meanwhile, it is the number of convolutional kernels;

  • kernel_size=5: means that each kernel’s pixel is 5×5.

Fashion-MNIST Data Loading code

import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import numpy as np

transform = transforms.Compose(
    [transforms.Resize((28, 28)),
     transforms.ToTensor(),
     transforms.Normalize([0.5], [0.5])])

train_dataset = torchvision.datasets.FashionMNIST(root='./data', train=True, download=True, transform=transform)
val_dataset = torchvision.datasets.FashionMNIST(root='./data', train=False, download=True, transform=transform)

LeNet training code

from torch.utils.data import DataLoader

device = "cuda" if torch.cuda.is_available() else "cpu"

def init_weights(m):
    if type(m) == nn.Linear or type(m) == nn.Conv2d:
        nn.init.xavier_uniform_(m.weight)
model.apply(init_weights)

num_epochs = 30
batch_size = 128
lr = 0.3
model.to(device)
criterion = nn.CrossEntropyLoss(reduction='mean')
optimizer = torch.optim.SGD(model.parameters(), lr=lr, weight_decay=0.05)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)

for epoch in range(num_epochs):
    model.train()
    train_loss = 0
    for batch_features, batch_labels in train_loader:
        optimizer.zero_grad()
        batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
        train_outputs = model(batch_features)
        loss = criterion(train_outputs, batch_labels)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()
    train_loss /= len(train_loader)

    if (epoch + 1) % 2 == 0:
        model.eval()
        true_label, pred_label = [], []
        val_loss = 0
        with torch.no_grad():
            for batch_features, batch_labels in val_loader:
                batch_features, batch_labels = batch_features.to(device), batch_labels.to(device)
                val_outputs = model(batch_features)
                loss = criterion(val_outputs, batch_labels)
                true_label.extend(batch_labels.cpu().numpy().reshape(-1))
                pred_label.extend(np.argmax(val_outputs.cpu().numpy(), axis=1))
                val_loss += loss.item()
        val_accuracy = np.mean(np.array(true_label) == np.array(pred_label))
        val_loss /= len(val_loader)
        print('epoch {}/{} | train loss: {:.4f}, val loss: {:.4f} | Accuracy: {:.2f}'.format(epoch+1, num_epochs, train_loss, val_loss, val_accuracy))

The training result can reach an accuracy of \(87\%\).

LeNet prediction code

test_dataset = MyDataset(test_data, None, features_transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
pred_label = []
model.eval()
with torch.no_grad():
    for batch_features, _ in test_loader:
        batch_features = batch_features.to(device)
        test_outputs = model(batch_features)
        pred_label.extend(torch.argmax(test_outputs, dim=1).cpu().numpy())
# np.save('save/path', pred_label)

AlexNet

code

# The input image's shape is (num_channels: 1, height: 224, width: 224)
# The ouput classes number is 10
model = nn.Sequential(
    nn.Conv2d(1, 96, kernel_size=11, stride=4, padding=1), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.ReLU(),
    nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Flatten(),
    nn.Linear(6400, 4096), nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(4096, 4096), nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(4096, 10)
)

The in_channels of the first layer is 1 instead of 3 in order to train Fashion-MINST data set, which includes gray-scale images.

The model training code is the same with the code in LeNet trainging code. Because this is a deep convolutional network, perhaps I should try a smaller learning rate. And the final accuracy can reach \(92\%\).

Batch normalization

batch normalization

Batch normalization layer can constrain the batch data in network’s dataflow, alleviate the problem of gradient vanishing, and make deep networks converge more easily. It is applied between “affine transformation” and “activation function”.

The normalization formula is slightly different from the common one.

\[ BN(\boldsymbol{x})=\gamma\cdot\frac{\boldsymbol{x}-E(\boldsymbol{x})}{\sqrt{Var(\boldsymbol{x})}}+\beta\\ E(\boldsymbol{x})=\frac{1}{n}\sum_{i=1}^n\boldsymbol{x}\\ Var(\boldsymbol{x})=\frac{1}{n}\sum_{i=1}^n\big{(}\boldsymbol{x}-E(\boldsymbol{x})\big{)}^2+\epsilon \]

\(\gamma\) and \(\beta\) are parameters that needed to be learned by the model.

For example, suppose I have an input of dimension \((batch\_size=2, num\_channels=3, height=2, width=2)\).

\[ \begin{bmatrix}\begin{bmatrix}\color{red}{x^{(1)}_{11}}&\color{red}{x^{(1)}_{12}}\\\color{red}{x^{(1)}_{21}}&\color{red}{x^{(1)}_{22}}\end{bmatrix}&\begin{bmatrix}\color{green}{x^{(1)}_{11}}&\color{green}{x^{(1)}_{12}}\\\color{green}{x^{(1)}_{21}}&\color{green}{x^{(1)}_{22}}\end{bmatrix}&\begin{bmatrix}\color{blue}{x^{(1)}_{11}}&\color{blue}{x^{(1)}_{12}}\\\color{blue}{x^{(1)}_{21}}&\color{blue}{x^{(1)}_{22}}\end{bmatrix}\end{bmatrix}\\ \begin{bmatrix}\begin{bmatrix}\color{red}{x^{(2)}_{11}}&\color{red}{x^{(2)}_{12}}\\\color{red}{x^{(2)}_{21}}&\color{red}{x^{(2)}_{22}}\end{bmatrix}&\begin{bmatrix}\color{green}{x^{(2)}_{11}}&\color{green}{x^{(2)}_{12}}\\\color{green}{x^{(2)}_{21}}&\color{green}{x^{(2)}_{22}}\end{bmatrix}&\begin{bmatrix}\color{blue}{x^{(2)}_{11}}&\color{blue}{x^{(2)}_{12}}\\\color{blue}{x^{(2)}_{21}}&\color{blue}{x^{(2)}_{22}}\end{bmatrix}\end{bmatrix} \]

\[ \color{red}{mean}=\frac{1}{2\times2\times2}\sum_k\sum_i\sum_j\color{red}{x^{(k)}_{ij}}\\ \color{red}{var}=\frac{1}{2\times2\times2}\sum_k\sum_i\sum_j\big{(}\color{red}{x^{(k)}_{ij}}-\color{red}{mean}\big{)}^2 \]

bn = nn.BatchNorm2d(3)
#  input_2d shape: (2, 3, 2, 2)
input_2d = torch.tensor([
    [[[1, 2], [3, 4]],
     [[5, 6], [7, 8]],
     [[1, 3], [4, 5]]],
    [[[1, 2], [1, 2]],
     [[5, 6], [5, 6]],
     [[1, 3], [2, 4]]]
], dtype=torch.float32)
with torch.no_grad():
    output_2d = bn(input_2d)
    print(output_2d)
## tensor([[[[-1.0000,  0.0000],
##           [ 1.0000,  2.0000]],
## 
##          [[-1.0000,  0.0000],
##           [ 1.0000,  2.0000]],
## 
##          [[-1.3750,  0.0917],
##           [ 0.8250,  1.5584]]],
## 
## 
##         [[[-1.0000,  0.0000],
##           [-1.0000,  0.0000]],
## 
##          [[-1.0000,  0.0000],
##           [-1.0000,  0.0000]],
## 
##          [[-1.3750,  0.0917],
##           [-0.6417,  0.8250]]]])

AlexNet using batch normalization

# AlexNet using batch normalization
model = nn.Sequential(
    nn.Conv2d(1, 96, kernel_size=11, stride=4, padding=1), nn.BatchNorm2d(96), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Conv2d(96, 256, kernel_size=5, padding=2), nn.BatchNorm2d(256), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Conv2d(256, 384, kernel_size=3, padding=1), nn.BatchNorm2d(384), nn.ReLU(),
    nn.Conv2d(384, 384, kernel_size=3, padding=1), nn.BatchNorm2d(384), nn.ReLU(),
    nn.Conv2d(384, 256, kernel_size=3, padding=1), nn.BatchNorm2d(256), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2),
    nn.Flatten(),
    nn.Linear(6400, 4096), nn.BatchNorm1d(4096), nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(4096, 4096), nn.BatchNorm1d(4096), nn.ReLU(),
    nn.Dropout(0.5),
    nn.Linear(4096, 10)
)

ResNet

A ResNet block

import torch.nn as nn
import torch.nn.functional as F
import torch

class Residual(nn.Module):
    def __init__(self, input_channels, num_channels, use_1x1conv=False, strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3, padding=1)

        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size=1, stride=strides)
        else:
            self.conv3 = None

        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3 is not None:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)

ResNet-18

# The input image's shape is (num_channels: 1, height: 224, width: 224)
# The ouput classes number is 10
def resnet_block(input_channels, num_channels, num_residuals, first_block=False):
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            '''
            The first block of two-blocks structures(b3, b4, b5) includes channel increasing(use_1x1conv=True)
            and pixel decreasing(stride=2).
            b2 is the first two-blocks structure which doesn't include channel increasing and pixel decreasing.
            '''
            blk.append(Residual(input_channels, num_channels, use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk

b1 = nn.Sequential(
    nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3),
    nn.BatchNorm2d(64), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
model = nn.Sequential(
    b1, b2, b3, b4, b5,
    nn.AdaptiveAvgPool2d((1, 1)),
    nn.Flatten(), nn.Linear(512, 10)
)

The training result can reach an accuracy of \(91\%\).

Dataflow

X = torch.randn(1, 1, 224, 224)
for layer in model:
    X = layer(X)
    print(layer.__calss__.__name__, 'output shape: \t', X.shape)

Conv3d

Self-designed model

import torch
import torch.nn as nn

# The input image's shape is (num_channels: 3, frames: 4, height: 224, width: 224)
# The ouput classes number is 10
class VideoClassifier(nn.Module):
    def __init__(self, num_classes):
        super(VideoClassifier, self).__init__()
        
        self.conv_layers = nn.Sequential(
            nn.Conv3d(3, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2, stride=2),
            
            nn.Conv3d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2, stride=2),
        )
        
        self.fc_layers = nn.Sequential(
            nn.Flatten(),
                                       
            nn.Linear(64 * 1 * 56 * 56, 1024),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            
            nn.Linear(512, num_classes),
        )
        
    def forward(self, x):
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)
        x = self.fc_layers(x)
        return x

model = VideoClassifier(num_classes=15)

If I have a frame size larger than 4, for example 16, I can change nn.Linear(64 * 1 * 56 * 56, 1024) to nn.Linear(64 * 16 * 56 * 56, 1024), or I can add two more blocks in self.conv_layer.

ResNet18-3d

from torchvision.models.video import r3d_18

num_classes = 10
model = r3d_18(pretrained=True)
model.fc = nn.Linear(512, num_classes)

CNN & RNN

The code below shows ResNet-18 + LSTM structure model.

import torch
import torch.nn as nn
import torch.nn.functional as F

class Residual(nn.Module):
    def __init__(self, input_channels, num_channels, use_1x1conv=False, strides=1):
        super().__init__()
        self.conv1 = nn.Conv2d(input_channels, num_channels, kernel_size=3, padding=1, stride=strides)
        self.conv2 = nn.Conv2d(num_channels, num_channels, kernel_size=3, padding=1)

        if use_1x1conv:
            self.conv3 = nn.Conv2d(input_channels, num_channels, kernel_size=1, stride=strides)
        else:
            self.conv3 = None

        self.bn1 = nn.BatchNorm2d(num_channels)
        self.bn2 = nn.BatchNorm2d(num_channels)

    def forward(self, X):
        Y = F.relu(self.bn1(self.conv1(X)))
        Y = self.bn2(self.conv2(Y))
        if self.conv3 is not None:
            X = self.conv3(X)
        Y += X
        return F.relu(Y)

def resnet_block(input_channels, num_channels, num_residuals, first_block=False):
    blk = []
    for i in range(num_residuals):
        if i == 0 and not first_block:
            blk.append(Residual(input_channels, num_channels, use_1x1conv=True, strides=2))
        else:
            blk.append(Residual(num_channels, num_channels))
    return blk

b1 = nn.Sequential(
    nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3),
    nn.BatchNorm2d(64), nn.ReLU(),
    nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
)
b2 = nn.Sequential(*resnet_block(64, 64, 2, first_block=True))
b3 = nn.Sequential(*resnet_block(64, 128, 2))
b4 = nn.Sequential(*resnet_block(128, 256, 2))
b5 = nn.Sequential(*resnet_block(256, 512, 2))
CNN_model = nn.Sequential(
    b1, b2, b3, b4, b5,
    nn.AdaptiveAvgPool2d((1, 1)),
    nn.Flatten()
)

class CNNtoRNN(nn.Module):
    def __init__(self, num_classes, rnn_hidden_size, num_layers=1):
        super(CNNtoRNN, self).__init__()

        self.cnn = CNN_model
        cnn_output_features = 512

        self.rnn_hidden_size = rnn_hidden_size
        self.num_layers = num_layers
        self.rnn = nn.LSTM(input_size=cnn_output_features,
                           hidden_size=self.rnn_hidden_size,
                           num_layers=self.num_layers,
                           batch_first=True)
        self.fc = nn.Linear(self.rnn_hidden_size, num_classes)

    def forward(self, x):
        # x's shape is (batch_size, win_len, channels, height, width)
        batch_size, win_len, channels, height, width = x.shape
        '''
        CNN:
        c_in's shape is (batch_size*win_len, channels, height, width)
        c_out's shape is (batch_size*win_len, cnn_output_features)
        '''
        c_in = x.view(batch_size * win_len, channels, height, width)
        c_out = self.cnn(c_in)

        '''
        RNN:
        r_in's shape is (batch_size, win_len, cnn_output_features)
        r_out(batch_size, win_len, num_hiddens) is all of the hidden states
        hn(1, batch_size, num_hiddens) is the last hidden state
        r_out[:, -1, :].squeeze(1) is hn.squeeze(0)
        cn(1, batch_size, num_hiddens) is LSTM's memory cell
        '''
        r_in = c_out.view(batch_size, win_len, -1)
        r_out, (hn, cn) = self.rnn(r_in)
        r_out = r_out[:, -1, :]
        output = self.fc(r_out)

        return output

model = CNNtoRNN(num_classes=15, rnn_hidden_size=32)

input_tensor = torch.rand(16, 10, 3, 224, 224) # (batch_size, frames, channels, height, width)
output = model(input_tensor)
print(output.shape)  # output's shape is (batch_size, num_classes)

As to video classification mission, CNN&RNN has a worse effect than CNN3d.