CNN code1
CNN 세팅, ResNet
import os
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import time
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torchvision.models as models
from torchvision.utils import make_grid
from torchsummary import summary
device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # GPU 할당
# Colab torch, gpu 확인
print("Torch version:{}".format(torch.__version__))
print("cuda version: {}".format(torch.version.cuda))
print("cudnn version:{}".format(torch.backends.cudnn.version()))
ROOT = "/content/drive/MyDrive"
DIR = "product_classification" # data 폴더와 ipynb 파일이 위치한 폴더명을 적으세요.
PATH = os.path.join(ROOT, DIR)
os.chdir(PATH) # 현재 프로젝트 PATH로 이동
# train, validation은 npy형태로 제공됩니다.
X_train = np.load("Dataset/Train/X_train.npy")
Y_train = np.load("Dataset/Train/Y_train.npy")
X_val = np.load("Dataset/Valid/X_val.npy")
Y_val = np.load("Dataset/Valid/Y_val.npy")
X_test = np.load("Dataset/Test/X_test.npy") # 최종 제출할 prediction 결과에 사용할 데이터입니다.
BATCH_SIZE = 64
X_train = torch.from_numpy(X_train).float()
Y_train = torch.from_numpy(Y_train).long()
X_val = torch.from_numpy(X_val).float()
Y_val = torch.from_numpy(Y_val).long()
X_test = torch.from_numpy(X_test).float()
# Print data info
print(f"X_train shape: {X_train.shape}")
print(f"X_val shape: {X_val.shape}")
train = TensorDataset(X_train, Y_train)
val = TensorDataset(X_val, Y_val)
# Train_Loader : mini batch 분할
train_loader = DataLoader(train, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val, batch_size=BATCH_SIZE, shuffle=False)
Accuracy 계산 커스텀 함수
def accuracy(outputs, labels):
_, preds = torch.max(outputs, dim=1)
return torch.tensor(torch.sum(preds == labels).item() / len(preds))
class CNN(nn.Module):
def __init__(self, num_classes):
super(CNN, self).__init__()
self.conv1 = nn.Sequential(
nn.Conv2d(in_channels=3, out_channels=16, kernel_size=(3, 3), stride=(2, 2)),
nn.BatchNorm2d(16),
nn.ReLU()
)
self.conv2 = nn.Sequential(
nn.Conv2d(in_channels=16, out_channels=32, kernel_size=(3, 3), stride=(2, 2)),
nn.BatchNorm2d(32),
nn.ReLU()
)
self.conv3 = nn.Sequential(
nn.Conv2d(in_channels=32, out_channels=32, kernel_size=(3, 3), stride=(2, 2)),
nn.BatchNorm2d(32),
nn.ReLU(),
nn.MaxPool2d(2, 2)
)
self.lin = nn.Sequential(
nn.Linear(7200, num_classes),
)
def forward(self, x):
out = self.conv1(x)
out = self.conv2(out)
out = self.conv3(out)
out = out.flatten(start_dim=1)
out = self.lin(out)
return out
# pytorch에서 제공하는 CNN module 사용하기
class Select_model(nn.Module):
def __init__(self, model_name, num_classes=4, pretrained=False):
super(Select_model, self).__init__()
# Use a pretrained model
if model_name == "vgg":
self.network = models.vgg16(pretrained=pretrained)
# Replace last layer
num_ftrs = self.network.classifier[6].in_features
self.network.fc = nn.Linear(num_ftrs, num_classes)
elif model_name == "resnet":
self.network = models.resnet18(pretrained=pretrained)
# Replace last layer
num_ftrs = self.network.fc.in_features
self.network.fc = nn.Linear(num_ftrs, num_classes)
elif model_name == "googlenet":
self.network = models.googlenet(pretrained=pretrained, aux_logits=False)
# Replace last layer
num_ftrs = self.network.fc.in_features
self.network.fc = nn.Linear(num_ftrs, num_classes)
elif model_name == "efficientnet_b0":
self.network = models.efficientnet_b0(pretrained=pretrained, aux_logits=False)
# Replace last layer
num_ftrs = self.network.classifier[1].in_features
self.network.fc = nn.Linear(num_ftrs, num_classes)
def forward(self, xb):
return self.network(xb)
model = CNN(num_classes=4)
print(model)
model = Select_model('resnet')
print(model)
"""
2. hyperparameter를 바꿔봅시다!
- 처음에 제공되는 값을 임의로 설정한 값입니다. 바꾸시고 돌리시면 됩니다!
"""
# learning rate : 모델의 학습 속도를 조절합니다.
lr = 1e-10
# weight_decay : L2 regularization으로 모델의 overfitting을 방지합니다
weight_decay = 1e-10
# epoch : 전체 dataset을 몇번 학습 시킬지 조절해보세요
epochs = 2
# model 선택
## net = CNN(num_classes=4).to(device)
net = Select_model('resnet').to(device)
# optimizer 선택 (gradient descent : model update 방법): SGD, Adam, RMSProp
# pytorch에서 제공하는 optimizer : https://pytorch.org/docs/stable/optim.html
opt = "Adam"
if opt == "SGD":
optimizer = optim.SGD(net.parameters(), lr=lr, momentum=0.9, weight_decay=weight_decay)
elif opt == "Adam":
optimizer = optim.Adam(net.parameters(), lr=lr, betas=(0.9, 0.999), weight_decay=weight_decay)
elif opt == "RMSProb":
optimizer = optim.RMSProb(net.parameters(), lr=lr, alpha = 0.9, momentum=0.9, weight_decay=weight_decay)
# learning rate decay : 학습 도중 learning rate를 조절하는 technique
# pytorch에서 제공하는 learning rate decay : https://pytorch.org/docs/stable/optim.html
lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.995) # epoch마다 x0.995 만큼 lr 감소
# training
train_loss_list = []
val_loss_list = []
train_correct_list = []
val_correct_list = []
result = {}
# loss 함수
criterion = nn.CrossEntropyLoss()
for epoch in range(1, epochs + 1):
net.train()
train_loss = 0
valLoss = 0
train_correct = 0
val_correct = 0
start_time = time.time()
# training
for batch_idx, (input, label) in enumerate(train_loader):
input, label = input.to(device), label.to(device)
optimizer.zero_grad()
out = net(input)
loss = criterion(out, label)
train_loss += loss.item()
loss.backward()
optimizer.step()
train_acc = accuracy(out, label)
train_correct += train_acc
# validation
with torch.no_grad():
for batch_idx, (val_input, val_label) in enumerate(val_loader):
net.eval()
val_input, val_label = val_input.to(device), val_label.to(device)
val_out = net(val_input)
val_loss = criterion(val_out, val_label)
val_acc = accuracy(val_out, val_label)
valLoss += val_loss.item()
val_correct += val_acc
print("[=] EPOCH [{:}/{:}] TIME [{:.3}s]".format(epoch, epochs, time.time()-start_time) + \
" | TRAIN_LOSS [{:.3}] TRAIN_ACC [{:.3}] VAL_LOSS [{:.3}] VAL_ACC [{:.3}] ".format(
train_loss / len(train_loader), train_correct / len(train_loader), valLoss / len(val_loader), val_correct/len(val_loader)))
train_loss_list.append(train_loss / len(train_loader))
train_correct_list.append(train_correct.item() / len(train_loader))
val_loss_list.append(valLoss/len(val_loader))
val_correct_list.append(val_correct.item()/len(val_loader))
lr_scheduler.step() # learning rate schedular step
# 결과 저장
result['train_loss'] = train_loss_list
result['train_acc'] = train_correct_list
result['val_loss'] = val_loss_list
result['val_acc'] = val_correct_list
total_result = []
total_result.append(result)
def plot_acc(total_result):
train_acc = [x['train_acc'] for x in total_result]
val_acc = [x['val_acc'] for x in total_result]
plt.plot(*train_acc)
plt.plot(*val_acc)
plt.xlabel('epoch')
plt.ylabel('accuracy')
plt.legend(['Training', 'Validation'])
plt.title('Accuracy per epochs');
plt.show()
def plot_loss(total_result):
train_loss = [x['train_loss'] for x in total_result]
val_loss = [x['val_loss'] for x in total_result]
plt.plot(*train_loss)
plt.plot(*val_loss)
plt.xlabel('epoch')
plt.ylabel('loss')
plt.legend(['Training', 'Validation'])
plt.title('Loss per epochs');
plt.show()
plot_acc(total_result)
plot_loss(total_result)
# test 결과 확인
def predict_test(img, model):
x = img.to(device)
model.eval()
y = model(x)
_, pred = torch.max(y, dim=1)
return pred
# test data shape
print(X_test.shape)
# test data 예측결과 list로 저장
preds = []
for i in range(len(X_test)):
pred = predict_test(X_test[i:i+1], net)
preds.append(pred.item())
print(len(preds)) # 개수가 1120개가 맞는지 확인하세요!
# DataFrame
# id 추가
id = [i for i in range(len(X_test))]
test_preds = {'id': id, 'label': preds}
# Make output directory : test data 결과 파일 저장 경로
SAVE_PATH = os.path.join(PATH, "output")
if not os.path.exists(SAVE_PATH):
os.mkdir(SAVE_PATH)
else:
pass
# Make submission file
team = "Big_Star"
# 이 밑은 수정하지 마세요.
sub = pd.DataFrame(test_preds)
sub.to_csv(os.path.join(SAVE_PATH, f"./{team}_submission.csv"), index=False)
from torchvision import models
model_arch='vgg11'
num_classes=5
model = models.__dict__[model_arch](pretrained = True)
in_features = model.classifier[6].in_features
model.classifier[6] = nn.Linear(in_features, num_classes)
model = model.to(device)