티스토리 뷰
목차
지난 글에서는 Fashion MNIST를 사용한 Vanilla GAN 코드에 대해 살펴보았다.
[PyTorch]생성적 적대 신경망(GANGenerative Adversarial Network)
이번 글에서는 오토인코더와 K-평균 알고리즘을 결합한 깊은 K-평균 알고리즘에 대해 알아보려 한다.
비지도학습도 K-평균 알고리즘도 처음인지라, 먼저 각 개념에 대해 정리하고 코드를 요약하겠다.
글이 길어질 것만 같은 기분이 들지만, 일단 시작!
비지도 학습
비지도 학습은 데이터에 대한 레이블 없이 입력 데이터의 패턴이나 구조를 학습하는 방법이다.
데이터에 대한 정답(레이블)이 없기 때문에 학습 과정에서 지도 학습처럼 입력-출력 관계를 학습하지 않고,
데이터를 스스로 그룹화하거나 내재된 특성을 찾아내는 데 초점이 맞춰진다.
비지도 학습은 다음과 같은 이유로 중요하다.
- 레이블 없는 데이터 활용
대부분의 데이터는 레이블이 없는 형태로 존재한다. 비지도 학습은 이러한 레이블 없는 데이터를 분석하고 유용한 정보를 추출할 수 있게 한다. - 데이터의 패턴 탐색
데이터의 잠재적 구조, 군집, 또는 분포를 이해할 수 있다. 이는 데이터의 분류, 탐색적 데이터 분석, 특징 추출 등에 유용하다. - 전이 학습 및 초기화
비지도 학습으로 사전 학습된 모델은 지도 학습에서 더 빠르고 효율적으로 학습할 수 있도록 돕는다. 특히 초기화 또는 특성 추출 단계에서 중요한 역할을 한다. - 비용 절감
레이블 데이터를 생성하는 데 드는 비용과 시간이 절약된다.
K-평균 알고리즘
K-평균(K-means) 알고리즘은 비지도 학습에서 널리 사용되는 군집화(clustering) 방법이다.
주어진 데이터를 K개의 그룹으로 나누고,
각 데이터 포인트를 가장 가까운 클러스터 중심(centroid)에 할당하는 방식으로 동작한다.
- 알고리즘의 주요 단계
- 초기화: K개의 클러스터 중심을 무작위로 선택한다.
- 할당: 각 데이터 포인트를 가장 가까운 클러스터 중심에 할당한다.
- 업데이트: 각 클러스터의 중심을 다시 계산한다. 클러스터에 포함된 데이터 포인트의 평균값으로 중심을 갱신한다.
- 반복: 할당과 업데이트 단계를 중심 위치가 수렴하거나 특정 반복 횟수에 도달할 때까지 반복한다.
- 장점과 단점
- 장점: 간단하고 빠르며 대규모 데이터셋에도 효과적이다.
- 단점: 초기 중심값 선택에 따라 결과가 달라지고, 구형 분포를 가정하기 때문에 복잡한 클러스터 구조에 적합하지 않을 수 있다.
- 응용 분야
이미지 압축, 문서 군집화, 데이터 시각화 등 다양한 분야에서 활용된다.
여기서 사용된 알고리즘
여기서 살펴볼 코드는 오토인코더(Autoencoder)와 K-평균 알고리즘을 결합한 깊은 K-평균(Deep K-means) 알고리즘이다.
일반적인 K-평균 알고리즘과의 주요 차이점은
오토인코더를 사용해 데이터를 잠재 공간(latent space)으로 변환한 후 클러스터링을 수행한다는 점이다.
그러니까 오토인코더를 일종의 고오급 차원축소 기법으로 사용하는 K-평균 알고리즘이라 보면 된다.
- 오토인코더
오토인코더는 입력 데이터를 저차원 공간(latent space)으로 압축(인코딩)한 뒤 이를 다시 복원(디코딩)하는 신경망이다.
- 인코더: 데이터를 저차원 표현으로 압축한다.
- 디코더: 압축된 표현을 원래 데이터로 복원한다.
- 목표: 입력 데이터와 복원된 데이터 간의 차이를 최소화(MSE 손실 사용).
- 깊은 K-평균의 학습 과정
- 오토인코더의 인코더를 통해 데이터를 잠재 공간으로 압축한다.
- 잠재 공간에서 K-평균 알고리즘을 적용해 클러스터를 형성한다.
- 클러스터 중심(centroids)을 활용해 데이터 점과 중심 간의 차이를 추가 손실 함수로 학습에 반영한다.
- 오토인코더와 클러스터링 과정이 상호 보완적으로 작동하며, 데이터의 잠재 표현과 군집화를 동시에 최적화한다.
- 장점
- 높은 차원의 복잡한 데이터를 저차원으로 압축해 클러스터링 성능을 향상시킨다.
- 클러스터 중심이 학습 과정에 통합되어, K-평균의 초기화 민감도를 줄일 수 있다.
- 사용된 코드의 주요 요소
- Kmeans 모듈: 잠재 공간에서 각 데이터 점을 클러스터 중심에 할당.
- Encoder와 Decoder: 데이터를 잠재 공간으로 변환하고 다시 복원.
- 손실 함수: 입력 이미지와 복원 이미지 간의 차이(l_rec)와 잠재 표현과 클러스터 중심 간의 차이(l_clt)를 합산.
- 학습 전략: 손실 함수의 가중치(alpha)를 점진적으로 증가시켜 군집화와 복원 성능을 함께 개선.
선 요약
이 글에서 다룰 코드는 아래와 같다.
import torch
import torch.nn as nn
import torchvision
from torchvision import transforms
import numpy as np
from scipy.optimize import linear_sum_assignment as linear_assignment
from sklearn.manifold import TSNE
from matplotlib import pyplot as plt
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"{device} is available.")
batch_size = 128
num_clusters = 10
latent_size = 10
trainset = torchvision.datasets.MNIST(
"./data/", download=True, train=True, transform=transforms.ToTensor()
)
testset = torchvision.datasets.MNIST(
"./data/", download=True, train=False, transform=transforms.ToTensor()
)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True)
testloader = torch.utils.data.DataLoader(testset, batch_size=batch_size, shuffle=False)
class Flatten(torch.nn.Module):
def forward(self, x):
batch_size = x.shape[0]
return x.view(batch_size, -1)
class Deflatten(nn.Module):
def __init__(self, k):
super(Deflatten, self).__init__()
self.k = k
def forward(self, x):
s = x.size()
feature_size = int((s[1] // self.k) ** 0.5)
return x.view(s[0], self.k, feature_size, feature_size)
class Kmeans(nn.Module):
def __init__(self, num_clusters, latent_size):
super(Kmeans, self).__init__()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.num_clusters = num_clusters
self.centroids = nn.Parameter(
torch.rand((self.num_clusters, latent_size)).to(device)
)
def argminl2distance(self, a, b):
return torch.argmin(torch.sum((a - b) ** 2, dim=1), dim=0)
def forward(self, x):
y_assign = []
for m in range(x.size(0)):
h = x[m].expand(self.num_clusters, -1)
assign = self.argminl2distance(h, self.centroids)
y_assign.append(assign.item())
return y_assign, self.centroids[y_assign]
class Encoder(nn.Module):
def __init__(self, latent_size):
super(Encoder, self).__init__()
k = 16
self.encoder = nn.Sequential(
nn.Conv2d(1, k, 3, stride=2),
nn.ReLU(),
nn.Conv2d(k, 2 * k, 3, stride=2),
nn.ReLU(),
nn.Conv2d(2 * k, 4 * k, 3, stride=1),
nn.ReLU(),
Flatten(),
nn.Linear(1024, latent_size),
nn.ReLU(),
)
def forward(self, x):
return self.encoder(x)
class Decoder(nn.Module):
def __init__(self, latent_size):
super(Decoder, self).__init__()
k = 16
self.decoder = nn.Sequential(
nn.Linear(latent_size, 1024),
nn.ReLU(),
Deflatten(4 * k),
nn.ConvTranspose2d(
4 * k, 2 * k, 3, stride=1
), # (입력 채널 수, 출력 채널 수, 필터 크기, stride)
nn.ReLU(),
nn.ConvTranspose2d(2 * k, k, 3, stride=2),
nn.ReLU(),
nn.ConvTranspose2d(k, 1, 3, stride=2, output_padding=1),
nn.Sigmoid(),
)
def forward(self, x):
return self.decoder(x)
def cluster_acc(y_true, y_pred):
y_true = np.array(y_true)
y_pred = np.array(y_pred)
D = max(y_pred.max(), y_true.max()) + 1
w = np.zeros((D, D), dtype=np.int64)
for i in range(y_pred.size):
w[y_pred[i], y_true[i]] += 1
ind = linear_assignment(w.max() - w)
return sum([w[i, j] for i, j in zip(ind[0], ind[1])]) * 1.0 / y_pred.size
def evaluation(testloader, encoder, kmeans, device):
predictions = []
actual = []
with torch.no_grad():
encoder.eval()
for images, labels in testloader:
inputs = images.to(device)
labels = labels.to(device)
latent_var = encoder(inputs)
y_pred, _ = kmeans(latent_var)
predictions += y_pred
actual += labels.cpu().tolist()
encoder.train()
return cluster_acc(actual, predictions)
encoder = Encoder(latent_size).to(device)
decoder = Decoder(latent_size).to(device)
kmeans = Kmeans(num_clusters, latent_size).to(device)
# Loss and optimizer
criterion1 = torch.nn.MSELoss()
criterion2 = torch.nn.MSELoss()
optimizer = torch.optim.Adam(
list(encoder.parameters()) + list(decoder.parameters()) + list(kmeans.parameters()),
lr=1e-3,
)
# Training
T1 = 50
T2 = 200
lam = 1e-3
ls = 0.05
for ep in range(300):
if (ep > T1) and (ep < T2):
alpha = lam * (ep - T1) / (T2 - T1) # 1/100, 2/100, .., 99/100
elif ep >= T2:
alpha = lam
else:
alpha = lam / (T2 - T1)
running_loss = 0.0
for images, _ in trainloader:
inputs = images.to(device)
optimizer.zero_grad()
latent_var = encoder(inputs)
_, centroids = kmeans(latent_var.detach())
outputs = decoder(latent_var)
l_rec = criterion1(inputs, outputs)
l_clt = criterion2(latent_var, centroids)
loss = l_rec + alpha * l_clt
loss.backward()
optimizer.step()
running_loss += loss.item()
avg_loss = running_loss / len(trainloader)
if ep % 10 == 0:
testacc = evaluation(testloader, encoder, kmeans, device)
print("[%d] Train loss: %.4f, Test Accuracy: %.3f" % (ep, avg_loss, testacc))
if avg_loss < ls:
ls = avg_loss
torch.save(encoder.state_dict(), "./models/dkm_en.pth")
torch.save(decoder.state_dict(), "./models/dkm_de.pth")
torch.save(kmeans.state_dict(), "./models/dkm_clt.pth")
encoder.load_state_dict(torch.load("./models/dkm_en.pth"))
decoder.load_state_dict(torch.load("./models/dkm_de.pth"))
kmeans.load_state_dict(torch.load("./models/dkm_clt.pth"))
with torch.no_grad():
encoder.eval()
for images, _ in testloader:
inputs = images.to(device)
latent_var = encoder(inputs)
outputs = decoder(latent_var)
input_samples = inputs.permute(0, 2, 3, 1).cpu().numpy()
reconstructed_samples = outputs.permute(0, 2, 3, 1).cpu().numpy()
break
columns = 10
rows = 5
print("Input images")
fig = plt.figure(figsize=(columns, rows))
for i in range(1, columns * rows + 1):
img = input_samples[i - 1]
fig.add_subplot(rows, columns, i)
plt.imshow(img.squeeze())
plt.axis("off")
plt.show()
print("Reconstruction images")
fig = plt.figure(figsize=(columns, rows))
for i in range(1, columns * rows + 1):
img = reconstructed_samples[i - 1]
fig.add_subplot(rows, columns, i)
plt.imshow(img.squeeze())
plt.axis("off")
plt.show()
결과는 다음과 같으며,
...
[260] Train loss: 0.0180, Test Accuracy: 0.860
[270] Train loss: 0.0180, Test Accuracy: 0.865
[280] Train loss: 0.0179, Test Accuracy: 0.862
[290] Train loss: 0.0179, Test Accuracy: 0.865
핵심 워크플로우는 다음과 같다.
- 데이터 준비:
- MNIST 데이터셋을 로드하고, 텐서 형식으로 변환하여 배치 단위로 처리.
- 모델 구성:
- Encoder: 이미지 데이터를 잠재 공간(latent space)으로 압축.
- Decoder: 잠재 공간에서 데이터를 원래 이미지 형태로 복원.
- K-means: 잠재 공간에서 중심점(centroids)을 학습하고, 클러스터 할당.
- 손실 함수 정의:
- 복원 손실: 원본 이미지와 복원된 이미지 간의 차이를 계산(MSE).
- 클러스터 손실: 잠재 공간 벡터와 중심점 간의 차이를 계산(MSE).
- 훈련 과정:
- 인코더와 디코더를 통해 입력 이미지를 잠재 공간으로 인코딩하고, 복원.
- k-평균 알고리즘으로 중심점 학습 및 클러스터 할당.
- 손실(복원 손실 + 클러스터 손실)을 계산하고 모델 업데이트.
- 평가:
- Hungarian 알고리즘으로 클러스터링 결과를 실제 레이블과 비교하여 정확도 계산.
- 시각화:
- 복원된 이미지와 원본 이미지 비교.
- 잠재 공간의 데이터를 TSNE로 2D 시각화하여 클러스터 분포 확인.
계속해서 코드를 뜯어보자.
필요한 라이브러리 불러오기 및 설정
# 필요한 라이브러리 불러오기
import torch # 딥러닝 모델 생성 및 학습을 위한 기본 라이브러리
import torch.nn as nn # 신경망 모듈을 포함하는 서브패키지
import torchvision # 딥러닝용 이미지 데이터셋과 전처리 도구 제공
from torchvision import transforms # 이미지 데이터 변환(전처리) 도구
import numpy as np # 수치 연산 및 배열 처리를 위한 라이브러리
from scipy.optimize import linear_sum_assignment as linear_assignment # Hungarian 알고리즘 구현
from sklearn.manifold import TSNE # 고차원 데이터를 2D 또는 3D로 시각화하기 위한 도구
from matplotlib import pyplot as plt # 데이터 시각화를 위한 도구
# GPU 또는 CPU 사용 설정
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # GPU가 사용 가능하면 'cuda:0', 아니면 CPU를 사용
print(f"{device} is available.") # 사용 가능한 디바이스 출력
- PyTorch와 관련 패키지
- torch:
- 딥러닝 모델 생성과 학습을 위한 핵심 라이브러리이다.
- 텐서 연산(다차원 배열 연산), 자동 미분, GPU 가속 등을 제공한다.
- torch.nn:
- 신경망 레이어와 손실 함수 등의 구성 요소를 제공하는 모듈이다.
- 예: nn.Linear, nn.Conv2d, nn.ReLU 등.
- torchvision:
- PyTorch에 특화된 컴퓨터 비전 관련 라이브러리이다.
- 주요 기능:
- 데이터셋 로드: MNIST, CIFAR-10 등 유명 이미지 데이터셋 제공.
- 데이터 전처리: 이미지 크기 조정, 정규화 등의 변환 기능.
- transforms:
- 이미지 데이터를 변환하기 위한 유틸리티이다.
- 예: 이미지를 텐서로 변환하거나, 픽셀 값을 정규화하는 데 사용한다.
- torch:
- 기타 라이브러리
- numpy:
- 고성능의 수치 연산 라이브러리이다.
- 텐서 연산, 선형대수 계산, 배열 생성 및 변환에 사용한다.
- scipy.optimize.linear_sum_assignment:
- Hungarian 알고리즘을 구현한 함수이다.
- 클러스터링 결과와 실제 레이블 간 최적 매칭을 계산한다.
- sklearn.manifold.TSNE:
- T-SNE(T-Distributed Stochastic Neighbor Embedding) 알고리즘을 구현한 도구이다.
- 고차원 데이터를 저차원(보통 2D)으로 축소하여 시각화할 때 사용한다.
- matplotlib.pyplot:
- 데이터 시각화를 위한 라이브러리이다.
- 학습 결과(이미지 복원, 클러스터링 등)를 시각적으로 표현한다.
- numpy:
- 디바이스 설정
- torch.device:
- 학습에 사용할 디바이스를 정의한다.
- GPU를 사용할 수 있으면 'cuda:0'으로 설정하고, 그렇지 않으면 'cpu'로 설정한다.
- torch.cuda.is_available():
- 현재 시스템에서 GPU(CUDA)가 사용 가능한지 확인한다.
- GPU는 대규모 연산을 병렬 처리할 수 있어 학습 속도를 크게 향상시킨다.
- torch.device:
Hungarian 알고리즘
Hungarian 알고리즘이란?
- Hungarian 알고리즘은 최적 매칭 문제(Optimal Assignment Problem)를 푸는 알고리즘이다.
- 최적 매칭 문제는 작업과 작업자를 효율적으로 매칭하거나, 두 집합 간 매칭 비용을 최소화하는 문제를 다룬다.
- 예를 들어, n개의 작업과 n명의 작업자가 있을 때, 각 작업자마다 특정 작업을 수행하는 비용이 다르다면, 전체 비용을 최소화하는 매칭 방법을 찾는다.
Hungarian 알고리즘이 필요한 이유
- 비슷한 문제를 단순히 조합으로 풀려면 경우의 수가 너무 많아져 계산이 비효율적이다.
- 예: 5개의 작업과 5명의 작업자가 있다면, 가능한 매칭 조합은 5! = 120가지이다.
- Hungarian 알고리즘은 효율적으로 비용을 최소화하는 매칭을 계산한다.
Hungarian 알고리즘의 주요 개념
Hungarian 알고리즘은 비용 행렬(Cost Matrix)을 기반으로 작동한다.
- 비용 행렬은 다음과 같은 2D 배열로 표현된다:
작업자 A 작업자 B 작업자 C
작업 1 4 2 3
작업 2 2 3 4
작업 3 3 1 2
- 행(row)은 작업을 나타낸다.
- 열(column)은 작업자를 나타낸다.
- 각 값은 특정 작업자가 작업을 수행할 때의 비용이다.
목표는 이 비용 행렬에서 매칭 비용의 합이 최소가 되도록 작업과 작업자를 매칭하는 것이다.
Hungarian 알고리즘의 동작 원리
Hungarian 알고리즘은 다음의 4단계로 이루어진다:
- 행 감산(Row Reduction): 각 행에서 가장 작은 값을 빼 모든 행에서 최소값이 0이 되게 만든다.
원래 행렬: 행 감소 후:
4 2 3 2 0 1
2 3 4 --> 0 1 2
3 1 2 2 0 1
- 열 감산(Column Reduction): 각 열에서 가장 작은 값을 빼 모든 열에서 최소값이 0이 되게 만든다.
행 감소 후: 열 감소 후:
2 0 1 2 0 0
0 1 2 --> 0 1 1
2 0 1 2 0 0
- 선 긋기(Line Drawing)
- 0을 포함한 최소한의 행과 열에 선을 그어 모든 0을 덮는다.
- 만약 선의 개수가 행(또는 열)의 개수와 같다면, 매칭이 가능하다.
- 그렇지 않다면, 다음 단계로 진행한다.
- 행렬 수정(Matrix Adjustment)
- 선이 그어지지 않은 요소 중 가장 작은 값을 찾는다.
- 그 값을 선이 없는 모든 요소에서 빼고, 교차점에서는 더한다.
- 새로운 행렬로 돌아가 1~3단계를 반복한다.
Hungarian 알고리즘 결과
- 알고리즘은 비용 행렬에서 최적의 매칭(최소 비용)을 반환한다.
- 위 예에서는 작업 1→작업자 B, 작업 2→작업자 A, 작업 3→작업자 C가 최적 매칭이다.
코드 구현 예시
Python에서 Hungarian 알고리즘은 scipy.optimize.linear_sum_assignment 함수로 간단히 구현된다.
import numpy as np
from scipy.optimize import linear_sum_assignment
# 비용 행렬 정의
cost_matrix = np.array([
[4, 2, 3], # 작업 1의 비용
[2, 3, 4], # 작업 2의 비용
[3, 1, 2], # 작업 3의 비용
])
# Hungarian 알고리즘 적용
row_ind, col_ind = linear_sum_assignment(cost_matrix)
# 매칭 결과 출력
print("최적 매칭:")
for i, j in zip(row_ind, col_ind):
print(f"작업 {i+1} → 작업자 {j+1}")
# 총 최소 비용
total_cost = cost_matrix[row_ind, col_ind].sum()
print(f"총 최소 비용: {total_cost}")
결과 해석
출력
최적 매칭:
작업 1 → 작업자 2
작업 2 → 작업자 1
작업 3 → 작업자 3
총 최소 비용: 6
작업 1 → 작업자 2 (비용 2), 작업 2 → 작업자 1 (비용 2), 작업 3 → 작업자 3 (비용 2)로 매칭되어 총 비용이 최소(6)가 된다.
Hungarian 알고리즘은 복잡한 매칭 문제를 효율적으로 해결할 수 있어
클러스터링 평가, 작업 분배, 네트워크 라우팅 등 다양한 분야에서 활용된다.
왜 필요한가?
우리 코드에서 Hungarian 알고리즘이 필요한 이유는 클러스터링 결과와 실제 레이블 간의 매칭을 평가하기 위해서이다.
코드에서의 역할
- 클러스터링 평가:
- 모델이 데이터를 클러스터로 나눌 때, 각 클러스터 번호는 데이터의 실제 레이블과 순서가 다를 수 있다.
- 예: 모델이 0, 1, 2라는 클러스터를 생성했지만, 실제 레이블은 2, 0, 1일 수 있다.
- Hungarian 알고리즘은 클러스터 번호와 실제 레이블 간의 최적 매칭을 계산해, 모델의 정확도를 평가한다.
- 최적 매칭:
- Hungarian 알고리즘을 통해, 모델이 생성한 클러스터를 실제 레이블에 맞게 매칭한다.
- 이를 통해 클러스터링 결과의 품질을 객관적으로 측정할 수 있다.
간단한 예
- 예를 들어, 모델이 다음과 같이 클러스터링 결과를 반환했다고 가정하자:
- 실제 레이블: [0, 1, 2, 0, 1, 2]
- 클러스터: [2, 0, 1, 2, 0, 1]
- Hungarian 알고리즘은 클러스터 번호 2 → 레이블 0, 0 → 레이블 1, 1 → 레이블 2로 매칭하여, 결과를 정확히 비교할 수 있게 만든다.
결과
Hungarian 알고리즘 덕분에 클러스터링 성능을 정확히 평가할 수 있고, 모델의 개선 방향을 파악하는 데 도움을 준다.
데이터셋 불러오기 및 전처리
# 배치 크기를 설정한다. 한 번에 128개의 데이터를 학습에 사용할 수 있도록 설정한다.
batch_size = 128
# MNIST 데이터셋을 불러온다. 훈련용 데이터와 테스트용 데이터를 각각 가져온다.
trainset = torchvision.datasets.MNIST(
"./data/", # 데이터를 저장할 경로를 지정한다.
download=True, # 데이터가 없으면 다운로드한다.
train=True, # 훈련 데이터셋을 가져오도록 설정한다.
transform=transforms.ToTensor() # 이미지를 텐서로 변환한다.
)
testset = torchvision.datasets.MNIST(
"./data/", # 데이터를 저장할 경로를 지정한다.
download=True, # 데이터가 없으면 다운로드한다.
train=False, # 테스트 데이터셋을 가져오도록 설정한다.
transform=transforms.ToTensor() # 이미지를 텐서로 변환한다.
)
# 데이터를 배치 크기만큼 묶어서 제공하는 DataLoader를 생성한다.
trainloader = torch.utils.data.DataLoader(
trainset, # 훈련 데이터셋을 로드한다.
batch_size=batch_size, # 한 번에 128개의 데이터를 가져온다.
shuffle=True # 데이터를 섞어서 가져와 학습이 편향되지 않도록 한다.
)
testloader = torch.utils.data.DataLoader(
testset, # 테스트 데이터셋을 로드한다.
batch_size=batch_size, # 한 번에 128개의 데이터를 가져온다.
shuffle=False # 테스트 데이터는 섞지 않고 순서대로 가져온다.
)
- 배치 크기:
- batch_size는 한 번에 학습에 사용할 데이터의 개수를 의미한다.
- MNIST 데이터셋은 28x28 크기의 흑백 이미지로 이루어져 있어 한 배치에 128개의 이미지를 처리할 수 있다.
- 배치 크기를 설정하면 GPU/CPU 메모리 사용량과 학습 속도 간의 균형을 맞출 수 있다.
- MNIST 데이터셋:
- MNIST는 60,000개의 훈련 데이터와 10,000개의 테스트 데이터로 구성된다.
- 데이터는 손으로 작성된 숫자(0~9) 이미지와 그에 해당하는 라벨(정답)을 포함한다.
- train=True: 훈련 데이터셋을 가져온다.
- train=False: 테스트 데이터셋을 가져온다.
- download=True: 데이터셋이 로컬에 없으면 다운로드한다.
- transform=transforms.ToTensor():
- 이미지를 텐서(Tensor) 형식으로 변환한다.
- MNIST 데이터는 0~255 범위의 픽셀 값을 가진다. ToTensor는 이 값을 01~로 정규화하고, 데이터 타입을 PyTorch 텐서로 변경한다.
- 정규화된 데이터는 학습을 더 안정적으로 수행할 수 있도록 돕는다.
- DataLoader:
- DataLoader는 데이터를 효율적으로 로드하기 위한 도구이다.
- batch_size=batch_size: 설정된 크기(128)만큼 데이터를 묶어서 가져온다.
- shuffle=True: 데이터를 섞어서 가져온다. 이는 학습이 특정 순서에 의존하지 않도록 하는 데 중요하다.
- shuffle=False: 테스트 데이터는 순서를 유지한다. 테스트에서는 일정한 순서로 예측해야 비교가 쉽기 때문이다.
- trainloader와 testloader:
- trainloader: 훈련 데이터를 배치 단위로 제공한다.
- testloader: 테스트 데이터를 배치 단위로 제공한다.
- 이 두 로더를 사용하면 데이터셋을 반복적으로 처리할 수 있어 학습과 평가 루프에서 간편하게 사용할 수 있다.
주요 포인트
- MNIST 데이터셋은 딥러닝 입문 단계에서 가장 자주 사용하는 데이터셋이다.
- DataLoader는 메모리 효율성과 학습 안정성을 높여준다.
- 데이터 정규화와 배치 처리는 모델 학습 과정에서 중요한 기본 설정이다.
유틸리티 모듈 정의
class Flatten(torch.nn.Module):
def forward(self, x):
# 입력 텐서를 2D 형태 (배치 크기, 나머지 요소)로 변경한다.
# CNN 출력은 (배치 크기, 채널, 높이, 너비) 형태인데, 이를 Fully Connected Layer에 넣기 위해 1차원 벡터로 변환한다.
batch_size = x.shape[0] # 현재 배치 크기
return x.view(batch_size, -1) # 배치 크기를 유지한 채 나머지를 하나의 차원으로 합친다.
class Deflatten(nn.Module):
def __init__(self, k):
super(Deflatten, self).__init__()
self.k = k # k는 채널 수를 나타낸다.
def forward(self, x):
# 1차원 벡터 형태의 텐서를 다시 CNN 형식으로 변환한다.
s = x.size() # 입력 텐서의 크기. 예: (배치 크기, 채널*높이*너비)
feature_size = int((s[1] // self.k) ** 0.5) # 채널 외 나머지 요소를 가로, 세로 크기로 계산한다.
# 최종 출력은 (배치 크기, 채널(k), 높이, 너비) 형태가 된다.
return x.view(s[0], self.k, feature_size, feature_size)
Flatten
- 역할:
- 입력 데이터를 1차원 벡터로 변환한다.
- CNN 레이어는 출력 텐서를 (배치 크기, 채널, 높이, 너비) 형태로 반환한다.
- Fully Connected Layer는 1차원 벡터 형태의 입력을 필요로 하기 때문에 변환이 필요하다.
- 코드 작동:
- batch_size = x.shape[0]: 현재 입력 텐서의 배치 크기를 가져온다.
- x.view(batch_size, -1): 배치 크기는 유지한 채 나머지 차원을 하나로 병합한다.
- 예: (128, 16, 4, 4) → (128, 256).
- 예제:
- 입력: (128, 16, 4, 4) (배치 크기=128, 채널=16, 높이=4, 너비=4)
- 출력: (128, 256) (배치 크기=128, 나머지 요소=16×4×4=256)
Deflatten
- 역할:
- 1차원 벡터를 CNN 입력 형태로 변환한다.
- Fully Connected Layer 출력이나 잠재 공간 벡터를 다시 이미지 형태로 복원할 때 사용한다.
- 코드 작동:
- s = x.size(): 입력 텐서의 크기를 가져온다.
- 예: (128, 256).
- feature_size = int((s[1] // self.k) ** 0.5):
- 입력의 총 요소 수에서 채널 수 k를 나눈 뒤, 이를 정사각형 형태(가로=세로)로 변환한다.
- 예: 256 // 16 = 16, 가로와 세로는 각각 sqrt(16) = 4.
- x.view(s[0], self.k, feature_size, feature_size):
- 배치 크기와 채널 수를 유지하면서 가로, 세로 크기를 지정하여 CNN 입력 형태로 변환한다.
- 예: (128, 256) → (128, 16, 4, 4).
- s = x.size(): 입력 텐서의 크기를 가져온다.
- 예제:
- 입력: (128, 256) (배치 크기=128, 1차원 벡터=256)
- 채널 수 k = 16
- 출력: (128, 16, 4, 4) (배치 크기=128, 채널=16, 높이=4, 너비=4)
Flatten과 Deflatten의 관계
- Flatten은 이미지 데이터를 1차원으로 압축하여 Fully Connected Layer로 전달한다.
- Deflatten은 1차원으로 압축된 데이터를 다시 이미지 형식으로 복원하여 ConvTranspose2d와 같은 레이어로 전달한다.
- 이 둘은 오토인코더에서 인코더(Flatten)와 디코더(Deflatten)의 역할을 수행한다.
K-평균 알고리즘 클래스 정의
class Kmeans(nn.Module):
def __init__(self, num_clusters, latent_size):
super(Kmeans, self).__init__()
# 사용 가능한 디바이스 설정 (GPU가 없으면 CPU 사용)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.num_clusters = num_clusters # 클러스터 개수 설정
# 클러스터 중심점(centroids)을 학습 가능한 파라미터로 정의
self.centroids = nn.Parameter(
torch.rand((self.num_clusters, latent_size)).to(device)
)
def argminl2distance(self, a, b):
# 두 벡터 a와 b의 모든 L2 거리(유클리드 거리) 계산
# 각 중심점(b)와 a의 거리를 계산한 뒤, 최소값의 인덱스를 반환
return torch.argmin(torch.sum((a - b) ** 2, dim=1), dim=0)
def forward(self, x):
y_assign = [] # 각 입력 데이터의 클러스터 할당 결과를 저장할 리스트
for m in range(x.size(0)): # 배치 크기만큼 반복
# 현재 입력 데이터 x[m]을 클러스터 개수만큼 복사하여 확장
h = x[m].expand(self.num_clusters, -1)
# 확장된 데이터 h와 중심점 간의 L2 거리를 비교하여 가장 가까운 클러스터 인덱스를 할당
assign = self.argminl2distance(h, self.centroids)
# 해당 인덱스를 결과 리스트에 추가
y_assign.append(assign.item())
# 입력 데이터에 대한 클러스터 할당 결과(y_assign)와
# 할당된 클러스터의 중심점을 반환
return y_assign, self.centroids[y_assign]
초기화 메서드(생성자) __init__
def __init__(self, num_clusters, latent_size):
super(Kmeans, self).__init__()
# 사용 가능한 디바이스 설정 (GPU가 없으면 CPU 사용)
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
self.num_clusters = num_clusters # 클러스터 개수 설정
# 클러스터 중심점(centroids)을 학습 가능한 파라미터로 정의
self.centroids = nn.Parameter(
torch.rand((self.num_clusters, latent_size)).to(device)
)
- num_clusters와 latent_size
- 클러스터 개수와 데이터의 잠재 공간 크기를 인자로 받는다.
- 클러스터 개수는 데이터를 몇 개의 그룹으로 나눌지를 결정한다.
- 잠재 공간 크기는 오토인코더에서 출력된 데이터의 차원을 의미한다.
- centroids
- 클러스터 중심점(centroids)은 데이터를 군집화할 때 기준이 되는 점이다.
- 이 중심점은 학습 가능한 파라미터로 설정되어, 학습 중에 데이터 분포에 맞게 업데이트된다.
- 초기 값은 torch.rand를 사용해 무작위로 설정한다.
거리 계산 메서드 argminl2distance
def argminl2distance(self, a, b):
# 두 벡터 a와 b의 모든 L2 거리(유클리드 거리) 계산
# 각 중심점(b)와 a의 거리를 계산한 뒤, 최소값의 인덱스를 반환
return torch.argmin(torch.sum((a - b) ** 2, dim=1), dim=0)
- 역할
- 입력 벡터 a와 클러스터 중심점 b 사이의 L2 거리(유클리드 거리)를 계산하고, 최소 거리의 인덱스를 반환한다.
- 구현 과정
- (a - b) ** 2: 입력 벡터와 중심점 간의 차이를 제곱하여 거리 계산의 기초를 만든다.
- torch.sum(..., dim=1): 중심점의 각 차원에 대해 거리 값을 합산하여 최종 거리를 구한다.
- torch.argmin(..., dim=0): 계산된 거리들 중 최소값의 인덱스를 반환한다.
순전파 메서드 forward
def forward(self, x):
y_assign = [] # 각 입력 데이터의 클러스터 할당 결과를 저장할 리스트
for m in range(x.size(0)): # 배치 크기만큼 반복
# 현재 입력 데이터 x[m]을 클러스터 개수만큼 복사하여 확장
h = x[m].expand(self.num_clusters, -1)
# 확장된 데이터 h와 중심점 간의 L2 거리를 비교하여 가장 가까운 클러스터 인덱스를 할당
assign = self.argminl2distance(h, self.centroids)
# 해당 인덱스를 결과 리스트에 추가
y_assign.append(assign.item())
# 입력 데이터에 대한 클러스터 할당 결과(y_assign)와
# 할당된 클러스터의 중심점을 반환
return y_assign, self.centroids[y_assign]
- 입력
- 입력 텐서 x: 배치 크기(batch_size)와 잠재 공간 크기(latent_size)를 가지는 텐서.
- 작동 방식
- 반복문으로 각 데이터 처리
- 배치 크기만큼 반복하여 각 데이터를 클러스터에 할당한다.
- 데이터 복제 및 확장
- x[m].expand(self.num_clusters, -1): 현재 데이터(x[m])를 클러스터 개수만큼 복사한다. 이렇게 하면 모든 중심점과 비교할 준비가 된다.
- 클러스터 할당
- argminl2distance: 데이터와 모든 중심점 사이의 거리를 계산하여 가장 가까운 클러스터의 인덱스를 반환한다.
- 결과 저장
- 클러스터 할당 결과를 리스트(y_assign)에 추가한다.
- 반복문으로 각 데이터 처리
- 출력
- y_assign: 입력 데이터가 할당된 클러스터 인덱스 리스트.
- self.centroids[y_assign]: 할당된 클러스터의 중심점.
요약
- Kmeans 클래스는 학습 가능한 클러스터 중심점을 정의하고, 입력 데이터를 가장 가까운 클러스터에 할당한다.
- argminl2distance 메서드는 중심점과의 거리 계산을 담당한다.
- 클러스터링 결과는 클러스터 인덱스와 할당된 중심점으로 반환된다.
인코더와 디코더 정의
# 인코더 정의
class Encoder(nn.Module):
def __init__(self, latent_size):
super(Encoder, self).__init__()
k = 16 # 첫 번째 Conv 레이어의 출력 채널 수
self.encoder = nn.Sequential(
nn.Conv2d(1, k, 3, stride=2), # 입력 채널: 1 (흑백 이미지), 출력 채널: k
nn.ReLU(), # 비선형 활성화 함수
nn.Conv2d(k, 2 * k, 3, stride=2), # 출력 채널 수를 두 배로 늘림
nn.ReLU(),
nn.Conv2d(2 * k, 4 * k, 3, stride=1), # 출력 채널 수를 다시 두 배로 늘림
nn.ReLU(),
Flatten(), # 2D 이미지 데이터를 1D 벡터로 변환
nn.Linear(1024, latent_size), # 잠재 공간(latent space)으로 매핑
nn.ReLU(),
)
def forward(self, x):
return self.encoder(x)
# 디코더 정의
class Decoder(nn.Module):
def __init__(self, latent_size):
super(Decoder, self).__init__()
k = 16 # ConvTranspose 레이어의 기본 출력 채널 수
self.decoder = nn.Sequential(
nn.Linear(latent_size, 1024), # 잠재 공간에서 1024차원으로 확장
nn.ReLU(),
Deflatten(4 * k), # 1D 벡터를 2D 형태로 복원
nn.ConvTranspose2d(4 * k, 2 * k, 3, stride=1), # 업샘플링 수행
nn.ReLU(),
nn.ConvTranspose2d(2 * k, k, 3, stride=2), # 다시 업샘플링
nn.ReLU(),
nn.ConvTranspose2d(k, 1, 3, stride=2, output_padding=1), # 최종적으로 입력 크기 복원
nn.Sigmoid(), # 출력 값을 [0, 1] 범위로 제한
)
def forward(self, x):
return self.decoder(x)
인코더
class Encoder(nn.Module):
def __init__(self, latent_size):
super(Encoder, self).__init__()
k = 16 # 첫 번째 Conv 레이어의 출력 채널 수
self.encoder = nn.Sequential(
nn.Conv2d(1, k, 3, stride=2), # 입력 채널: 1 (흑백 이미지), 출력 채널: k
nn.ReLU(), # 비선형 활성화 함수
nn.Conv2d(k, 2 * k, 3, stride=2), # 출력 채널 수를 두 배로 늘림
nn.ReLU(),
nn.Conv2d(2 * k, 4 * k, 3, stride=1), # 출력 채널 수를 다시 두 배로 늘림
nn.ReLU(),
Flatten(), # 2D 이미지 데이터를 1D 벡터로 변환
nn.Linear(1024, latent_size), # 잠재 공간(latent space)으로 매핑
nn.ReLU(),
)
def forward(self, x):
return self.encoder(x)
역할: 입력 이미지 데이터를 압축된 형태의 잠재 공간(latent space)으로 매핑한다.
- 구조:
- Conv2d 레이어:
- 이미지의 중요한 특징(엣지, 패턴 등)을 추출한다.
- 첫 번째 Conv2d: 입력 이미지 크기를 줄이며 16개의 필터로 특징을 추출한다.
- 두 번째 Conv2d: 출력 채널을 32개로 늘려 더 복잡한 패턴을 학습한다.
- 세 번째 Conv2d: 출력 채널을 64개로 늘려 고수준 특징을 학습한다.
- Flatten:
- 2D 이미지 데이터를 1D 벡터로 변환한다.
- Fully Connected Layer로 전달하기 위함이다.
- Linear 레이어:
- 고차원 벡터를 낮은 차원의 잠재 공간으로 압축한다.
- ReLU 활성화 함수:
- 각 레이어의 출력에 비선형성을 부여한다.
- Conv2d 레이어:
디코더
class Decoder(nn.Module):
def __init__(self, latent_size):
super(Decoder, self).__init__()
k = 16 # ConvTranspose 레이어의 기본 출력 채널 수
self.decoder = nn.Sequential(
nn.Linear(latent_size, 1024), # 잠재 공간에서 1024차원으로 확장
nn.ReLU(),
Deflatten(4 * k), # 1D 벡터를 2D 형태로 복원
nn.ConvTranspose2d(4 * k, 2 * k, 3, stride=1), # 업샘플링 수행
nn.ReLU(),
nn.ConvTranspose2d(2 * k, k, 3, stride=2), # 다시 업샘플링
nn.ReLU(),
nn.ConvTranspose2d(k, 1, 3, stride=2, output_padding=1), # 최종적으로 입력 크기 복원
nn.Sigmoid(), # 출력 값을 [0, 1] 범위로 제한
)
def forward(self, x):
return self.decoder(x)
역할: 잠재 공간에서 데이터를 복원하여 원래의 이미지 크기와 형태로 재구성한다.
- 구조:
- Linear 레이어:
- 잠재 공간에서 데이터를 고차원 벡터로 확장한다.
- Deflatten:
- 1D 벡터를 2D 텐서로 변환하여 이미지 형태로 복원한다.
- ConvTranspose2d 레이어:
- 업샘플링을 통해 이미지 크기를 점진적으로 복원한다.
- stride=2로 설정하면 크기를 두 배로 확장한다.
- 마지막 ConvTranspose2d는 원래 크기의 흑백 이미지를 생성한다.
- Sigmoid 활성화 함수:
- 픽셀 값을 [0, 1]로 정규화한다.
- Linear 레이어:
작동 과정
- 인코더는 입력 이미지를 잠재 공간으로 압축하여 주요 정보를 유지하면서도 데이터 크기를 줄인다.
- 디코더는 잠재 공간 정보를 사용해 원래 이미지를 최대한 정확히 복원하려 한다.
- 이를 통해 데이터의 효율적 표현과 복원 능력을 동시에 학습한다.
클러스터링 정확도 계산 함수
def cluster_acc(y_true, y_pred):
# y_true와 y_pred를 numpy 배열로 변환한다
y_true = np.array(y_true)
y_pred = np.array(y_pred)
# 최대 레이블 값을 기준으로 정방행렬 크기를 설정한다
D = max(y_pred.max(), y_true.max()) + 1
# D x D 크기의 0으로 초기화된 정방행렬 w를 생성한다
w = np.zeros((D, D), dtype=np.int64)
# y_pred와 y_true를 비교하며 각 매칭 횟수를 w 행렬에 기록한다
for i in range(y_pred.size):
w[y_pred[i], y_true[i]] += 1
# Hungarian 알고리즘으로 최적 매칭을 계산한다
ind = linear_assignment(w.max() - w)
# 최적 매칭에 해당하는 값을 합산하여 정확도를 계산한다
return sum([w[i, j] for i, j in zip(ind[0], ind[1])]) * 1.0 / y_pred.size
y_true와 y_pred를 배열로 변환
y_true = np.array(y_true)
y_pred = np.array(y_pred)
- y_true: 데이터의 실제 레이블.
- y_pred: 클러스터링 알고리즘이 예측한 레이블.
- numpy 배열로 변환하여 효율적인 연산을 수행한다.
정방행렬 크기 설정
D = max(y_pred.max(), y_true.max()) + 1
- D는 레이블 값의 최대치 + 1로 설정한다.
- 이는 실제 레이블(y_true)과 예측 레이블(y_pred)을 모두 포함하는 정방행렬을 생성하기 위함이다.
행렬 w 초기화
w = np.zeros((D, D), dtype=np.int64)
- D x D 크기의 행렬을 0으로 초기화한다.
- w[i][j]는 클러스터 i가 실제 레이블 j와 매칭된 횟수를 저장한다.
매칭 횟수 기록
for i in range(y_pred.size):
w[y_pred[i], y_true[i]] += 1
- 예측된 레이블과 실제 레이블을 비교하여 매칭 횟수를 행렬 w에 기록한다.
- 예를 들어, 예측 레이블이 2이고 실제 레이블이 3인 경우 w[2][3]의 값을 1 증가시킨다.
Hungarian 알고리즘으로 최적 매칭 찾기
ind = linear_assignment(w.max() - w)
- Hungarian 알고리즘을 사용하여 비용이 가장 낮은 매칭을 찾는다.
- 여기서 w.max() - w를 사용하는 이유는 Hungarian 알고리즘이 최소 비용을 찾는 데 사용되기 때문이다.
- 매칭 결과는 ind로 반환되며, 이는 최적 매칭을 나타낸다.
정확도 계산
return sum([w[i, j] for i, j in zip(ind[0], ind[1])]) * 1.0 / y_pred.size
- ind[0]과 ind[1]은 각각 Hungarian 알고리즘으로 매칭된 행과 열의 인덱스이다.
- 이 인덱스에 해당하는 w[i, j] 값을 합산하여 정확도를 계산한다.
- 전체 데이터 크기 y_pred.size로 나누어 비율로 반환한다.
Hungarian 알고리즘의 역할
- Hungarian 알고리즘은 클러스터 번호와 실제 레이블 간의 최적 매칭을 찾는다.
- 클러스터 번호는 순서나 의미가 없으므로, Hungarian 알고리즘을 통해 재정렬하여 평가 지표를 공정하게 계산한다.
훈련 및 평가 루프
for ep in range(300):
# alpha 값을 설정한다. alpha는 클러스터링 손실(l_clt)의 가중치를 조절하는 변수이다.
if (ep > T1) and (ep < T2):
# 학습 초기 구간(T1 ~ T2)에서는 alpha가 점진적으로 증가한다.
alpha = lam * (ep - T1) / (T2 - T1)
elif ep >= T2:
# 학습이 충분히 진행된 이후에는 alpha를 고정된 값 lam으로 설정한다.
alpha = lam
else:
# 초기 구간(T1 이전)에서는 alpha를 작은 값으로 설정한다.
alpha = lam / (T2 - T1)
running_loss = 0.0 # 에포크마다 누적 손실 값을 초기화한다.
for images, _ in trainloader: # 배치 단위로 데이터를 처리한다.
inputs = images.to(device) # 입력 데이터를 GPU/CPU로 이동한다.
optimizer.zero_grad() # 이전 배치에서 계산된 기울기를 초기화한다.
# 인코더를 통해 입력 데이터를 잠재 공간(latent space)으로 변환한다.
latent_var = encoder(inputs)
# k-평균 알고리즘으로 잠재 공간 데이터를 클러스터 중심에 할당한다.
_, centroids = kmeans(latent_var.detach()) # detach()로 kmeans 파라미터가 업데이트되지 않게 한다.
# 디코더를 사용해 잠재 공간 데이터를 원래의 이미지 형태로 복원한다.
outputs = decoder(latent_var)
# 복원 손실 계산: 입력 데이터와 복원된 출력 데이터 간의 차이를 Mean Squared Error로 계산한다.
l_rec = criterion1(inputs, outputs)
# 클러스터 손실 계산: 잠재 공간 벡터와 할당된 클러스터 중심 간의 차이를 Mean Squared Error로 계산한다.
l_clt = criterion2(latent_var, centroids)
# 총 손실 계산: 복원 손실(l_rec)과 클러스터 손실(l_clt)을 가중치(alpha)로 결합한다.
loss = l_rec + alpha * l_clt
# 역전파(backpropagation)를 통해 기울기를 계산하고 가중치를 업데이트한다.
loss.backward()
optimizer.step()
# 배치 손실 값을 누적하여 에포크 손실을 계산한다.
running_loss += loss.item()
# 에포크 손실의 평균값을 계산한다.
avg_loss = running_loss / len(trainloader)
# 10 에포크마다 테스트 정확도를 출력한다.
if ep % 10 == 0:
# 테스트셋을 사용해 클러스터링 정확도를 평가한다.
testacc = evaluation(testloader, encoder, kmeans, device)
# 에포크 번호, 학습 손실, 테스트 정확도를 출력한다.
print("[%d] Train loss: %.4f, Test Accuracy: %.3f" % (ep, avg_loss, testacc))
# 평균 손실(avg_loss)이 현재 저장된 최소 손실(ls)보다 작으면 모델 상태를 저장한다.
if avg_loss < ls:
ls = avg_loss # 최소 손실 값을 업데이트한다.
# 인코더의 학습된 가중치를 ./models/dkm_en.pth 파일로 저장한다.
torch.save(encoder.state_dict(), "./models/dkm_en.pth")
# 디코더의 학습된 가중치를 ./models/dkm_de.pth 파일로 저장한다.
torch.save(decoder.state_dict(), "./models/dkm_de.pth")
# k-평균 알고리즘의 중심점(centroids)을 ./models/dkm_clt.pth 파일로 저장한다.
torch.save(kmeans.state_dict(), "./models/dkm_clt.pth")
# 저장된 인코더 가중치를 ./models/dkm_en.pth 파일에서 로드한다.
encoder.load_state_dict(torch.load("./models/dkm_en.pth"))
# 저장된 디코더 가중치를 ./models/dkm_de.pth 파일에서 로드한다.
decoder.load_state_dict(torch.load("./models/dkm_de.pth"))
# 저장된 k-평균 중심점 정보를 ./models/dkm_clt.pth 파일에서 로드한다.
kmeans.load_state_dict(torch.load("./models/dkm_clt.pth"))
- alpha 값 설정
- alpha는 클러스터링 손실(l_clt)에 대한 가중치를 의미한다. 학습 초반에는 복원 손실(l_rec)에 더 집중하고, 학습이 진행되면서 클러스터링 손실의 중요도를 점진적으로 증가시킨다.
- T1 ~ T2 구간에서는 alpha가 선형적으로 증가한다. 이는 초기에는 오토인코더의 재구성 성능을 우선적으로 높이고, 이후에 클러스터링을 점진적으로 학습하도록 유도하기 위함이다.
- 입력 데이터 처리
- 배치 단위로 데이터를 가져오고, GPU 또는 CPU로 전송한다.
- optimizer.zero_grad()를 호출하여 이전 배치에서 계산된 기울기를 초기화한다.
- 잠재 공간 변환 및 클러스터링
- 인코더를 사용해 이미지를 압축된 잠재 벡터로 변환한다.
- 잠재 벡터를 kmeans를 통해 클러스터에 할당하고, 할당된 클러스터 중심을 가져온다.
- 이때 latent_var.detach()를 사용하여 k-평균 알고리즘이 역전파에 영향을 받지 않도록 한다.
- 복원
- 디코더를 통해 잠재 벡터를 원래 이미지 형태로 복원한다.
- 손실 계산
- l_rec: 입력 이미지와 복원된 이미지 간의 Mean Squared Error. 오토인코더의 재구성 성능을 평가한다.
- l_clt: 잠재 벡터와 할당된 클러스터 중심 간의 Mean Squared Error. 클러스터링 성능을 평가한다.
- loss: 두 손실을 결합하여 최종 손실로 사용한다. alpha는 클러스터링 손실의 중요도를 조절한다.
- 역전파 및 가중치 업데이트
- loss.backward()를 호출하여 역전파를 수행한다.
- optimizer.step()을 통해 모델의 가중치를 업데이트한다.
- 평균 손실 계산 및 평가
- running_loss에 각 배치의 손실을 누적하여 에포크 평균 손실(avg_loss)을 계산한다.
- 10 에포크마다 테스트 데이터셋에서 클러스터링 정확도를 평가하고, 학습 손실과 함께 출력한다.
- 모델 저장
- 조건부 저장:
- 학습 중 손실(avg_loss)이 이전까지 기록된 최소 손실(ls)보다 작으면, 해당 모델 상태를 저장한다.
- 이는 모델이 점진적으로 더 나은 성능을 달성할 때만 저장하도록 하여 저장 파일을 효율적으로 관리한다.
- torch.save:
- state_dict()는 PyTorch 모델의 학습 가능한 파라미터(가중치와 바이어스)를 딕셔너리 형태로 반환한다.
- 이를 사용해 각 모델의 상태를 .pth 파일에 저장한다.
- 저장 경로는 상대 경로 "./models/"로 지정되어 있으며, 파일 이름은 모델별로 다르다:
- 인코더: "dkm_en.pth"
- 디코더: "dkm_de.pth"
- KMeans: "dkm_clt.pth"
- 저장 이유:
- 학습이 완료된 후에도 모델을 재사용하거나, 추론(inference)을 수행할 때 동일한 상태를 복원하기 위해 저장한다.
- 조건부 저장:
- 모델 로드
- torch.load:
- 저장된 .pth 파일을 읽어와 모델의 상태를 복원한다.
- 저장된 모델 상태를 로드하면 학습을 이어가거나, 새로운 데이터에 대해 결과를 생성할 수 있다.
- load_state_dict:
- 로드된 상태 딕셔너리를 모델에 적용하여 저장 당시와 동일한 가중치로 초기화한다.
- 로딩 순서:
- encoder.load_state_dict: 인코더 가중치를 복원한다.
- decoder.load_state_dict: 디코더 가중치를 복원한다.
- kmeans.load_state_dict: KMeans 알고리즘의 중심점(centroids)을 복원한다.
- torch.load:
손실 함수의 역할
- 복원 손실 (l_rec):
- 오토인코더가 입력 이미지를 얼마나 잘 복원하는지를 측정한다.
- 입력과 출력 간의 차이가 클수록 손실이 커지며, 이는 모델이 이미지를 잘 복원하지 못했다는 것을 의미한다.
- 클러스터 손실 (l_clt):
- 잠재 공간에서 데이터가 적절한 클러스터에 할당되었는지를 평가한다.
- 데이터가 클러스터 중심에서 멀리 떨어져 있을수록 손실이 커진다. 이는 모델이 데이터를 클러스터 중심에 잘 맞추지 못했음을 나타낸다.
복원된 이미지 시각화
# with torch.no_grad(): 역전파를 비활성화하여 메모리 사용을 줄이고 평가에만 집중한다.
# encoder.eval(): 모델을 평가 모드로 전환하여 Dropout과 BatchNorm과 같은 레이어가 학습 중 동작하지 않게 한다.
with torch.no_grad():
encoder.eval() # 평가 모드로 전환
for images, _ in testloader:
# 테스트 데이터의 배치를 GPU/CPU에 로드한다.
inputs = images.to(device)
# 입력 이미지를 인코더를 통해 잠재 공간(latent space)으로 변환한다.
latent_var = encoder(inputs)
# 잠재 공간에서 디코더를 통해 이미지를 복원한다.
outputs = decoder(latent_var)
# 입력 이미지를 (N, C, H, W) -> (N, H, W, C)로 변환하여 시각화에 적합한 형식으로 변경한다.
input_samples = inputs.permute(0, 2, 3, 1).cpu().numpy()
# 복원된 이미지를 동일한 형식으로 변환한다.
reconstructed_samples = outputs.permute(0, 2, 3, 1).cpu().numpy()
break # 첫 번째 배치만 사용한다.
# 시각화: 입력 이미지
columns = 10
rows = 5
print("Input images")
fig = plt.figure(figsize=(columns, rows))
for i in range(1, columns * rows + 1):
img = input_samples[i - 1] # 첫 번째 배치의 이미지를 하나씩 가져온다.
fig.add_subplot(rows, columns, i) # 행렬 구조로 이미지를 배치한다.
plt.imshow(img.squeeze()) # 채널 차원이 1인 경우 제거한다.
plt.axis("off") # 축을 숨겨 이미지만 강조한다.
plt.show()
# 시각화: 복원된 이미지
print("Reconstruction images")
fig = plt.figure(figsize=(columns, rows))
for i in range(1, columns * rows + 1):
img = reconstructed_samples[i - 1] # 복원된 이미지에서 가져온다.
fig.add_subplot(rows, columns, i) # 행렬 구조로 복원 이미지를 배치한다.
plt.imshow(img.squeeze()) # 채널 차원이 1인 경우 제거한다.
plt.axis("off") # 축을 숨겨 복원 이미지만 강조한다.
plt.show()
- with torch.no_grad()와 encoder.eval()
- with torch.no_grad():
- 평가 과정에서 역전파가 필요 없으므로, 이를 비활성화하여 메모리 사용량을 줄인다.
- 특히 GPU를 사용할 때 메모리 절약 효과가 크다.
- encoder.eval():
- 모델을 평가 모드로 전환한다.
- 평가 모드에서는 Dropout과 Batch Normalization이 고정된 값으로 작동하여 일관된 결과를 보장한다.
- with torch.no_grad():
- 데이터 변환 및 잠재 공간 활용
- inputs = images.to(device):
- 테스트 배치 데이터를 GPU 또는 CPU로 로드한다.
- latent_var = encoder(inputs):
- 입력 이미지를 압축된 잠재 공간(latent space)으로 변환한다.
- outputs = decoder(latent_var):
- 잠재 공간 데이터를 복원하여 원래의 이미지 형태로 만든다.
- inputs = images.to(device):
- permute로 차원 변경
- 딥러닝 모델의 출력은 (N, C, H, W) 형식이지만, matplotlib는 (N, H, W, C) 형식의 데이터를 필요로 한다.
- inputs.permute(0, 2, 3, 1):
- 입력 이미지 텐서의 차원을 변경한다.
- outputs.permute(0, 2, 3, 1):
- 복원된 이미지 텐서의 차원을 동일한 방식으로 변경한다.
- 시각화
- 입력 이미지 시각화:
- plt.imshow: 이미지를 화면에 출력한다.
- plt.axis("off"): 축을 숨겨 이미지에 집중할 수 있도록 한다.
- 복원된 이미지 시각화:
- 입력 이미지와 동일한 방식으로 복원된 이미지를 출력한다.
- 원본과 복원본을 비교해 모델의 성능을 직관적으로 평가할 수 있다.
- 입력 이미지 시각화:
결과
- 입력 이미지는 테스트 데이터셋에서 가져온 원본 이미지이다.
- 복원 이미지는 디코더를 통해 잠재 공간에서 다시 생성된 이미지이다.
- 입력과 복원 이미지를 나란히 비교하면 오토인코더가 원본 데이터를 얼마나 잘 재구성했는지 확인할 수 있다.
클러스터링 평가 및 시각화
# 클러스터링 평가 및 시각화
predictions = [] # 클러스터링 결과를 저장할 리스트
actual = [] # 실제 레이블(정답)을 저장할 리스트
latent_features = [] # 잠재 공간(latent space)에서 추출된 특징을 저장할 리스트
with torch.no_grad(): # 평가 과정에서는 역전파를 비활성화해 메모리 사용을 줄임
for images, labels in testloader: # 테스트 데이터셋의 모든 배치에 대해 반복
inputs = images.to(device) # 이미지를 GPU 또는 CPU로 이동
labels = labels.to(device) # 레이블(정답)을 GPU 또는 CPU로 이동
latent_var = encoder(inputs) # 인코더를 사용해 잠재 공간 표현(latent variable)을 추출
y_pred, _ = kmeans(latent_var) # 잠재 공간에서 k-평균 클러스터링을 수행
predictions += y_pred # 예측 결과를 리스트에 추가
latent_features += latent_var.cpu().tolist() # 잠재 변수 값을 리스트로 변환 후 저장
actual += labels.cpu().tolist() # 실제 레이블을 리스트로 변환 후 저장
# Hungarian 알고리즘을 사용해 클러스터링 정확도를 계산
print(cluster_acc(actual, predictions))
- predictions, actual, latent_features 초기화:
- predictions: 클러스터링 결과(예측된 클러스터 ID)를 저장한다.
- actual: 실제 레이블(데이터셋에서 제공하는 정답)을 저장한다.
- latent_features: 잠재 공간에서 추출된 고차원 특징 벡터를 저장한다.
- with torch.no_grad():
- 학습이 아닌 평가 단계이므로 torch.no_grad()를 사용해 역전파(gradient computation)를 비활성화한다.
- 이로 인해 메모리와 계산 자원을 절약할 수 있다.
- 테스트 데이터셋 반복:
- testloader는 배치 단위로 데이터를 제공한다.
- 각 배치에서 images는 입력 데이터, labels는 정답 레이블이다.
- 데이터를 GPU 또는 CPU로 이동한 후, 모델에 입력한다.
- 인코더를 통해 잠재 공간 표현 생성:
- latent_var = encoder(inputs):
- 인코더를 사용해 입력 데이터를 압축된 잠재 공간 표현으로 변환한다.
- 잠재 공간은 고차원 데이터(28x28 이미지)를 저차원 벡터로 변환해 클러스터링 효율성을 높인다.
- latent_var = encoder(inputs):
- k-평균 클러스터링 수행:
- y_pred, _ = kmeans(latent_var):
- 잠재 공간 표현(latent_var)에서 k-평균 알고리즘을 적용해 각 데이터 포인트가 속한 클러스터 ID(y_pred)를 계산한다.
- y_pred, _ = kmeans(latent_var):
- 결과 저장:
- predictions: 클러스터 ID를 리스트에 추가한다.
- latent_features: 잠재 공간에서 추출된 특징 벡터를 리스트에 추가한다.
- actual: 실제 레이블을 리스트에 추가한다.
- Hungarian 알고리즘을 이용한 클러스터링 평가:
- print(cluster_acc(actual, predictions)):
- cluster_acc 함수는 실제 레이블과 클러스터링 결과를 비교해 정확도를 계산한다.
- Hungarian 알고리즘은 클러스터와 실제 레이블 간의 최적 매핑을 계산해 정확도를 최대화한다.
Hungarian 알고리즘이 중요한 이유
- 클러스터링은 무작위로 번호가 할당되기 때문에, 클러스터 ID와 실제 레이블 번호가 일치하지 않을 수 있다.
- Hungarian 알고리즘은 이 번호를 매핑하여 최적의 정확도를 계산한다.
- 예를 들어, 실제 레이블 0이 클러스터 2에 할당되었다면 이를 교정하여 비교한다.
결과적으로 얻는 값
- predictions: k-평균 클러스터링 결과 (각 데이터의 클러스터 ID).
- actual: 데이터셋의 실제 레이블.
- cluster_acc: Hungarian 알고리즘을 기반으로 계산된 클러스터링 정확도.
잠재 공간 시각화
# TSNE를 사용해 잠재 공간(latent space)을 2차원으로 시각화하는 코드이다.
# 잠재 공간의 클러스터링 구조를 파악하고, 데이터의 분리 정도를 시각적으로 확인하기 위해 사용한다.
# TSNE 객체 생성: 잠재 공간의 고차원 데이터를 2차원으로 변환한다.
tsne = TSNE(n_components=2, random_state=0)
# `latent_features`를 numpy 배열로 변환한 뒤 TSNE를 적용해 2차원으로 축소한다.
cluster = np.array(tsne.fit_transform(np.array(latent_features)))
# 실제 라벨을 numpy 배열로 변환한다.
actual = np.array(actual)
# 시각화를 위한 플롯 설정: 그래프 크기를 10x10으로 설정한다.
plt.figure(figsize=(10, 10))
# MNIST 데이터의 0~9 숫자 각각에 대해 시각화한다.
mnist = range(10) # 숫자 라벨 범위를 0~9로 설정한다.
for i, label in zip(range(10), mnist):
# 현재 숫자(`i`)에 해당하는 데이터의 인덱스를 가져온다.
idx = np.where(actual == i)
# 클러스터 데이터를 x, y 좌표로 시각화한다.
plt.scatter(cluster[idx, 0], cluster[idx, 1], marker=".", label=str(label))
# 범례를 추가해 각 숫자에 해당하는 데이터 클러스터를 구분한다.
plt.legend()
# 플롯을 화면에 출력한다.
plt.show()
- TSNE 생성 및 데이터 변환
- TSNE(n_components=2, random_state=0):
- n_components=2: TSNE를 통해 데이터를 2차원으로 변환한다.
- random_state=0: 재현성을 보장하기 위해 난수 시드를 고정한다.
- fit_transform:
- latent_features(잠재 공간)는 현재 고차원 데이터(10차원)로 구성되어 있다.
- TSNE는 데이터를 저차원으로 변환하되, 데이터 간의 상대적인 거리(구조)를 최대한 유지한다.
- 출력은 각 데이터 포인트의 2차원 좌표로 구성된다.
- 시각화를 위한 준비
- actual = np.array(actual):
- 실제 레이블(0~9)을 numpy 배열로 변환해 인덱싱 및 필터링에 사용한다.
- plt.figure(figsize=(10, 10)):
- 플롯 크기를 10x10으로 설정한다. 클러스터가 많아 보일 수 있도록 적절한 크기를 설정한다.
- actual = np.array(actual):
- TSNE(n_components=2, random_state=0):
데이터 분리 및 시각화
클러스터 데이터 필터링
idx = np.where(actual == i)
- actual == i 조건을 만족하는 데이터의 인덱스를 찾는다.
- idx는 현재 레이블(i)에 해당하는 데이터의 위치를 나타낸다.
데이터 시각화
plt.scatter(cluster[idx, 0], cluster[idx, 1], marker=".", label=str(label))
- cluster[idx, 0]과 cluster[idx, 1]은 각각 x, y 좌표이다.
- 해당 레이블에 해당하는 데이터 포인트를 산점도로 표시한다.
범례 추가
plt.legend()
- 각 숫자(0~9)에 해당하는 클러스터를 색상과 라벨로 구분한다.
플롯 출력
plt.show()
- 2차원 공간에 표시된 클러스터를 화면에 출력한다.
출력 그래프의 해석
- 클러스터 구조:
- 각 숫자(0~9)에 해당하는 데이터가 서로 다른 클러스터로 분리되어야 한다.
- 클러스터가 밀집되어 있고, 다른 클러스터와 겹치지 않을수록 클러스터링 성능이 좋다고 평가할 수 있다.
- 오차 해석:
- 클러스터가 겹치는 경우, 모델이 해당 숫자를 구별하는 데 어려움을 겪는다는 의미이다.
- 예를 들어, 숫자 3과 8이 겹친다면, 잠재 공간에서 이 두 숫자의 특징이 비슷하게 인코딩되었음을 나타낸다.
- TSNE의 한계:
- TSNE는 고차원 공간의 상대적 구조를 최대한 유지하려 하지만, 완벽히 보존하지는 않는다.
- 따라서 클러스터 간 거리가 실제 고차원 공간의 거리를 완벽히 반영하지 않을 수 있다.
결론
이렇게 해서 오토인코더와 K-평균 알고리즘을 결합한 깊은 K-평균 알고리즘 코드에 대해 무사히 알아보았다.
중간에 다른 길로 새는 바람에 시간이 조금 더 걸렸는데, 독학이라는게 이런가보다 하고 넘어가자.
다음 글은 커리큘럼에 따라 설명 가능한 AI(Explainable AI)에 대한 글이 되지 않을까 싶다.
끝!
'Python > PyTorch' 카테고리의 다른 글
[PyTorch]설명 가능한 AI - CAM (1) | 2024.12.16 |
---|---|
[PyTorch]생성적 적대 신경망(GAN - Generative Adversarial Network) (1) | 2024.12.09 |
[PyTorch]오토인코더(Autoencoder) (2) | 2024.12.04 |
[Pytorch]Vanilla RNN과 확장된 기법들: LSTM, GRU, Bidirectional LSTM, Transformer (2) | 2024.12.03 |
[PyTorch]전이 학습(Transfer Learning) (0) | 2024.11.27 |
[PyTorch]Vanilla RNN을 활용한 코스피 예측 문제 (1) | 2024.11.26 |
- Total
- Today
- Yesterday
- 세계여행
- Algorithm
- 스트림
- java
- spring
- 지지
- Python
- 칼이사
- 남미
- 리스트
- 유럽
- 중남미
- 파이썬
- 세모
- 동적계획법
- 세계일주
- 야경
- a6000
- Backjoon
- 맛집
- 백준
- 기술면접
- 스프링
- 알고리즘
- BOJ
- RX100M5
- 면접 준비
- 여행
- 자바
- 유럽여행
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |