import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision.models as models
from torchvision.utils import make_grid
from torchsummary import summary

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # GPU 할당

# Colab torch, gpu 확인
print("Torch version:{}".format(torch.__version__))
print("cuda version: {}".format(torch.version.cuda))
print("cudnn version:{}".format(torch.backends.cudnn.version()))
Torch version:1.10.0+cpu
cuda version: None
cudnn version:None
ROOT = "/content/drive/MyDrive"
DIR = "product_classification"  # data 폴더와 ipynb 파일이 위치한 폴더명을 적으세요.
PATH = os.path.join(ROOT, DIR)
os.chdir(PATH)  # 현재 프로젝트 PATH로 이동

# train, validation은 npy형태로 제공됩니다.
X_train = np.load("Dataset/Train/X_train.npy")
Y_train = np.load("Dataset/Train/Y_train.npy")
X_val = np.load("Dataset/Valid/X_val.npy")
Y_val = np.load("Dataset/Valid/Y_val.npy")
X_test = np.load("Dataset/Test/X_test.npy")  # 최종 제출할 prediction 결과에 사용할 데이터입니다.
BATCH_SIZE = 64
X_train = torch.from_numpy(X_train).float()
Y_train = torch.from_numpy(Y_train).long()
X_val = torch.from_numpy(X_val).float()
Y_val = torch.from_numpy(Y_val).long()
X_test = torch.from_numpy(X_test).float()  

# Print data info
print(f"X_train shape: {X_train.shape}")
print(f"X_val shape: {X_val.shape}")

train = TensorDataset(X_train, Y_train) 
val = TensorDataset(X_val, Y_val)

# Train_Loader : mini batch 분할
train_loader = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val, batch_size=BATCH_SIZE, shuffle=False)  

Accuracy 계산 커스텀 함수

def accuracy(outputs, labels):
    _, preds = torch.max(outputs, dim=1)
    return torch.tensor(torch.sum(preds == labels).item() / len(preds))
class CNN(nn.Module):
    def __init__(self, num_classes):
        super(CNN, self).__init__()

        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3, out_channels=16, kernel_size=(3, 3), stride=(2, 2)),
            nn.BatchNorm2d(16),
            nn.ReLU()
        )

        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), stride=(2, 2)),
            nn.BatchNorm2d(32),
            nn.ReLU()
        )

        self.conv3 = nn.Sequential(
            nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), stride=(2, 2)),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2)
        )

        self.lin = nn.Sequential(
            nn.Linear(7200, num_classes),
        )

    def forward(self, x):
        out = self.conv1(x)
        out = self.conv2(out)
        out = self.conv3(out)
        out = out.flatten(start_dim=1)
        out = self.lin(out)

        return out

# pytorch에서 제공하는 CNN module 사용하기
class Select_model(nn.Module):
    def __init__(self, model_name, num_classes=4, pretrained=False):
        super(Select_model, self).__init__()

        # Use a pretrained model
        if model_name == "vgg":
            self.network = models.vgg16(pretrained=pretrained)
            # Replace last layer
            num_ftrs = self.network.classifier[6].in_features
            self.network.fc = nn.Linear(num_ftrs, num_classes)
        elif model_name == "resnet":  
            self.network = models.resnet18(pretrained=pretrained)
            # Replace last layer
            num_ftrs = self.network.fc.in_features
            self.network.fc = nn.Linear(num_ftrs, num_classes)
        elif model_name == "googlenet":
            self.network = models.googlenet(pretrained=pretrained, aux_logits=False)
            # Replace last layer
            num_ftrs = self.network.fc.in_features
            self.network.fc = nn.Linear(num_ftrs, num_classes)
        elif model_name == "efficientnet_b0":
            self.network = models.efficientnet_b0(pretrained=pretrained, aux_logits=False)
            # Replace last layer
            num_ftrs = self.network.classifier[1].in_features
            self.network.fc = nn.Linear(num_ftrs, num_classes)

    def forward(self, xb):
        return self.network(xb)
model = CNN(num_classes=4)
print(model)
CNN(
  (conv1): Sequential(
    (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2))
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv2): Sequential(
    (0): Conv2d(16, 32, kernel_size=(3, 3), stride=(2, 2))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
  )
  (conv3): Sequential(
    (0): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2))
    (1): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (lin): Sequential(
    (0): Linear(in_features=7200, out_features=4, bias=True)
  )
)
model = Select_model('resnet')
print(model)
Select_model(
  (network): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (layer2): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 128, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (downsample): Sequential(
          (0): Conv2d(64, 128, kernel_size=(1, 1), stride=(2, 2), bias=False)
          (1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): BasicBlock(
        (conv1): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (layer3): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(128, 256, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (downsample): Sequential(
          (0): Conv2d(128, 256, kernel_size=(1, 1), stride=(2, 2), bias=False)
          (1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): BasicBlock(
        (conv1): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (layer4): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(256, 512, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (downsample): Sequential(
          (0): Conv2d(256, 512, kernel_size=(1, 1), stride=(2, 2), bias=False)
          (1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        )
      )
      (1): BasicBlock(
        (conv1): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(512, 512, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(512, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
    )
    (avgpool): AdaptiveAvgPool2d(output_size=(1, 1))
    (fc): Linear(in_features=512, out_features=4, bias=True)
  )
)

Training

"""
2. hyperparameter를 바꿔봅시다!
- 처음에 제공되는 값을 임의로 설정한 값입니다. 바꾸시고 돌리시면 됩니다!
"""
# learning rate : 모델의 학습 속도를 조절합니다. 
lr = 1e-10

# weight_decay : L2 regularization으로 모델의 overfitting을 방지합니다
weight_decay = 1e-10

# epoch : 전체 dataset을 몇번 학습 시킬지 조절해보세요
epochs = 2

# model 선택

## net = CNN(num_classes=4).to(device)
net = Select_model('resnet').to(device)

# optimizer 선택 (gradient descent : model update 방법): SGD, Adam, RMSProp
# pytorch에서 제공하는 optimizer : https://pytorch.org/docs/stable/optim.html
opt = "Adam"
if opt == "SGD":
    optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay) 
elif opt == "Adam":
    optimizer = optim.Adam(net.parameters(), lr=lr, betas=(0.9, 0.999), weight_decay=weight_decay)
elif opt == "RMSProb":
    optimizer = optim.RMSProb(net.parameters(), lr=lr, alpha = 0.9, momentum=0.9, weight_decay=weight_decay)

# learning rate decay : 학습 도중 learning rate를 조절하는 technique 
# pytorch에서 제공하는 learning rate decay : https://pytorch.org/docs/stable/optim.html
lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.995)  # epoch마다 x0.995 만큼 lr 감소
# training
train_loss_list = []
val_loss_list = []
train_correct_list = []
val_correct_list = []
result = {}

# loss 함수
criterion = nn.CrossEntropyLoss() 

for epoch in range(1, epochs + 1):
    net.train()
    train_loss = 0
    valLoss = 0
    train_correct = 0
    val_correct = 0
    start_time = time.time()

    # training
    for batch_idx, (input, label) in enumerate(train_loader):
        input, label = input.to(device), label.to(device)

        optimizer.zero_grad()
        out = net(input)
        loss = criterion(out, label)
        train_loss += loss.item()
        loss.backward()
        optimizer.step()

        train_acc = accuracy(out, label)
        train_correct += train_acc

    # validation
    with torch.no_grad():
        for batch_idx, (val_input, val_label) in enumerate(val_loader):
            net.eval()
            val_input, val_label = val_input.to(device), val_label.to(device)
            val_out = net(val_input)
            val_loss = criterion(val_out, val_label)
            val_acc = accuracy(val_out, val_label)
            valLoss += val_loss.item()
            val_correct += val_acc

    print("[=] EPOCH [{:}/{:}] TIME [{:.3}s]".format(epoch, epochs, time.time()-start_time) + \
          " | TRAIN_LOSS [{:.3}] TRAIN_ACC [{:.3}] VAL_LOSS [{:.3}] VAL_ACC [{:.3}] ".format(
              train_loss / len(train_loader), train_correct / len(train_loader), valLoss / len(val_loader), val_correct/len(val_loader)))
    train_loss_list.append(train_loss / len(train_loader))
    train_correct_list.append(train_correct.item() / len(train_loader))
    val_loss_list.append(valLoss/len(val_loader))
    val_correct_list.append(val_correct.item()/len(val_loader))
    lr_scheduler.step()  # learning rate schedular step

# 결과 저장
result['train_loss'] = train_loss_list
result['train_acc'] = train_correct_list
result['val_loss'] = val_loss_list
result['val_acc'] = val_correct_list
total_result = []
total_result.append(result)

학습결과 시각화

def plot_acc(total_result):
    train_acc = [x['train_acc'] for x in total_result]
    val_acc = [x['val_acc'] for x in total_result]
    plt.plot(*train_acc)
    plt.plot(*val_acc)
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.legend(['Training', 'Validation'])
    plt.title('Accuracy per epochs');
    plt.show()


def plot_loss(total_result):
    train_loss = [x['train_loss'] for x in total_result]
    val_loss = [x['val_loss'] for x in total_result]
    plt.plot(*train_loss)
    plt.plot(*val_loss)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend(['Training', 'Validation'])
    plt.title('Loss per epochs');
    plt.show()

plot_acc(total_result)
plot_loss(total_result)
# test 결과 확인
def predict_test(img, model):
    x = img.to(device)
    model.eval()
    y = model(x)
    _, pred  = torch.max(y, dim=1)

    return pred

# test data shape
print(X_test.shape)

# test data 예측결과 list로 저장
preds = []
for i in range(len(X_test)):
    pred = predict_test(X_test[i:i+1], net)
    preds.append(pred.item())

print(len(preds))  # 개수가 1120개가 맞는지 확인하세요!

# DataFrame 
# id 추가
id = [i for i in range(len(X_test))]
test_preds = {'id': id, 'label': preds}

결과제출

# Make output directory : test data 결과 파일 저장 경로
SAVE_PATH = os.path.join(PATH, "output")
if not os.path.exists(SAVE_PATH):
    os.mkdir(SAVE_PATH)
else:
    pass

# Make submission file
team = "Big_Star"

# 이 밑은 수정하지 마세요.
sub = pd.DataFrame(test_preds)
sub.to_csv(os.path.join(SAVE_PATH, f"./{team}_submission.csv"), index=False)

Case 2

from torchvision import models
model_arch='vgg11'
num_classes=5

model = models.__dict__[model_arch](pretrained = True)

in_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(in_features, num_classes)
model = model.to(device)