Info
- Author : 천용희 (Yonghee Cheon)
- Type : 코드 실습 설명
- Description : Pytorch로 구현하는 Word2Vec 기본 구조
들어가며
백문이 불여일견이고 백논문은 불여일코딩입니다.
컨셉은 알더라도 이를 구현해보면 더욱 확실하게 와닿습니다.
시작합니다.
모듈 임포트
from typing import *
from itertools import chain
import torch
import torch.nn.functional as F
from nltk import word_tokenize
from nltk.stem import WordNetLemmatizer
from torch.autograd import Variable
전처리 클래스
Intuition
- Word2Vec Trainer에서 메소드 실행 한 번이면 전처리를 모두 완료하고 싶어서 전처리 함수를 묶어 클래스로 만들었습니다.
- 학습을 위해 필요한 전처리 과정을 모두 담은 클래스를 만듭니다.
- 우선 필요한 기능만 나열해 스켈레톤을 짜봅니다.
Skeleton
- Word2Vec은 결국 윈도우 안에만 있으면 그외 컨텍스트를 반영하지 않기 때문에 Bag Of Words로 클래스 이름을 정했습니다.
- 토큰화 -> token2idx생성 -> 윈도우 내 pair생성 -> pair 생성 wrapper로 구성하였습니다.
class BagOfWords:
def __init__(self):
def make_tokenized_matrix_english(self, texts: List[str], lemmatize=True):
def make_token_indices(self):
def get_window_pairs(self, tokens: List[str], win_size=4, as_index=True) -> List[Tuple]:
def make_pairs_matrix(self, win_size, as_index=True):
토큰화 및 인덱싱
- nltk의 tokenizer 및 lemmatizer를 이용하여 토큰화합니다.
- lemmatize는 옵션으로 사용할 수 있게 두었습니다.
def __init__(self):
lemm = WordNetLemmatizer()
self.lemmatizer = lemm.lemmatize
def make_tokenized_matrix_english(self, texts: List[str], lemmatize=True):
if lemmatize:
self.tokenized_matrix = [[self.lemmatizer(word) for word in word_tokenize(text)] for text in texts]
else:
self.tokenized_matrix = [word_tokenize(text) for text in texts]
def make_token_indices(self):
assert self.tokenized_matrix
self.unique_tokens = get_uniques_from_nested_lists(self.tokenized_matrix)
self.token2idx, self.idx2token = get_item2idx(self.unique_tokens, unique=True)
self.vocab_size = len(self.token2idx)
- unique_token과 token2idx 생성시에는 아래 함수를 사용합니다.
def get_uniques_from_nested_lists(nested_lists: List[List]) -> List:
uniques = {}
for one_line in nested_lists:
for item in one_line:
if not uniques.get(item):
uniques[item] = 1
return list(uniques.keys())
def get_item2idx(items, unique=False) -> Tuple[Dict, Dict]:
item2idx, idx2item = dict(), dict()
items_unique = items if unique else set(items)
for idx, item in enumerate(items_unique):
item2idx[item] = idx
idx2item[idx] = item
return item2idx, idx2item
페어 생성 부분
- start, end의 max, min 부분은 각 문서의 인덱스를 넘어가지 않도록 방지하는 부분입니다.
- 윈도우와 문서 인덱스 내 있는 페어 중 자기 자신이 중복으로 들어가는 경우만 제외하고 모두 페어가 됩니다.
- as_index가
True
면 토큰의 인덱스가,False
면 토큰이 그대로 저장됩니다.
def get_window_pairs(self, tokens: List[str], win_size=4, as_index=True) -> List[Tuple]:
window_pairs = []
for idx, token in enumerate(tokens):
start = max(0, idx - win_size)
end = min(len(tokens), idx + win_size + 1)
for win_idx in range(start, end):
if not idx == win_idx:
pair = (token, tokens[win_idx])
pair = pair if not as_index else tuple(self.token2idx[t] for t in pair)
window_pairs.append(pair)
return window_pairs
def make_pairs_matrix(self, win_size, as_index=True):
self.pairs_matrix = [self.get_window_pairs(sent, win_size, as_index) for sent in self.tokenized_matrix]
self.pairs_flat = list(chain.from_iterable(self.pairs_matrix))
전처리 코드 완성본
def get_uniques_from_nested_lists(nested_lists: List[List]) -> List:
uniques = {}
for one_line in nested_lists:
for item in one_line:
if not uniques.get(item):
uniques[item] = 1
return list(uniques.keys())
def get_item2idx(items, unique=False) -> Tuple[Dict, Dict]:
item2idx, idx2item = dict(), dict()
items_unique = items if unique else set(items)
for idx, item in enumerate(items_unique):
item2idx[item] = idx
idx2item[idx] = item
return item2idx, idx2item
class BagOfWords:
def __init__(self):
lemm = WordNetLemmatizer()
self.lemmatizer = lemm.lemmatize
def make_tokenized_matrix_english(self, texts: List[str], lemmatize=True):
if lemmatize:
self.tokenized_matrix = [[self.lemmatizer(word) for word in word_tokenize(text)] for text in texts]
else:
self.tokenized_matrix = [word_tokenize(text) for text in texts]
def make_token_indices(self):
assert self.tokenized_matrix
self.unique_tokens = get_uniques_from_nested_lists(self.tokenized_matrix)
self.token2idx, self.idx2token = get_item2idx(self.unique_tokens, unique=True)
self.vocab_size = len(self.token2idx)
def get_window_pairs(self, tokens: List[str], win_size=4, as_index=True) -> List[Tuple]:
window_pairs = []
for idx, token in enumerate(tokens):
start = max(0, idx - win_size)
end = min(len(tokens), idx + win_size + 1)
for win_idx in range(start, end):
if not idx == win_idx:
pair = (token, tokens[win_idx])
pair = pair if not as_index else tuple(self.token2idx[t] for t in pair)
window_pairs.append(pair)
return window_pairs
def make_pairs_matrix(self, win_size, as_index=True):
self.pairs_matrix = [self.get_window_pairs(sent, win_size, as_index) for sent in self.tokenized_matrix]
self.pairs_flat = list(chain.from_iterable(self.pairs_matrix))
Word2Vec Trainer
- BoW와 마찬가지로 클래스로 작성하여 바깥에서 두세줄로 학습이 실행되도록 만듭니다.
Skeleton
class TrainWord2Vec:
def __init__(self):
def prepare_corpus(self, corpus):
# (대충 전처리한다는 내용)
def get_input_layer(self, word_idx):
# one hot encoding 수행
def train(self, emb_dimension=30, epochs=10, lr=0.001, continue_last=False):
# center, context vector 초기화
for epoch in range(epochs):
# 학습
전처리와 원 핫 인코딩 메소드
- 위에서 작성한 BOW 클래스를 활용하여 전처리를 진행합니다.
- 원 핫 인코딩 메소드의 경우 모듈화를 할 때 전역함수로 빼도 무방합니다. 지금은 한 코드베이스에서 모든 것을 수행하는 상황을 가정했기 때문에 클래스 안에 작성합니다.
def prepare_corpus(self, corpus):
self.bow = BagOfWords()
self.bow.make_tokenized_matrix_english(corpus, lemmatize=True)
self.bow.make_token_indices()
self.bow.make_pairs_matrix(win_size=2, as_index=True)
self.vocab_size = len(self.bow.token2idx)
self.train_size = len(self.bow.pairs_flat)
def get_input_layer(self, word_idx):
layer = torch.zeros(self.vocab_size)
layer[word_idx] = 1.0
return layer
모델링
벡터(가중치) 초기화
- Word2Vec은 뉴럴넷의 학습 과정을 통해 얻어진 가중치 매트릭스를 Representation으로 삼는 것이라고 볼 수 있기 때문에 가중치가 곧 우리가 원하는 타겟 벡터입니다.
- Xavier 초기화를 하였습니다.
- 임베딩 차원을 D, 전체 unique tokens 갯수를 V라고 할 때, center matrix는 (D, V)의 크기를, context matrix는 (V, D)의 크기를 갖도록 만듭니다.
- 이유는 이후 look-up 과정을 통해 center vector는 (D, 1)의 크기를 갖게 되는데, matmul((V, D), (D, 1))을 하면 center vector를 모든 context vector와 내적하는 과정을 편리하게 계산할 수 있기 때문입니다.
self.center_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(emb_dimension, self.vocab_size)),
requires_grad=True).float()
self.context_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(self.vocab_size, emb_dimension)),
requires_grad=True).float()
epoch, iteration
- 학습과정 확인을 위해 모든 iteration의 loss_value를 평균하여 epoch loss value를 출력합니다.
- batch size가 1인 SGD 방식으로 구현하였습니다.
- 매 iteration마다 center, context 벡터를 생성하는데 input_layer는 원핫 벡터로, y_true는 정답값을 원핫이 아닌 실수로 생성합니다.
for epoch in range(epochs):
loss_value = 0
for center_i, context_i in self.bow.pairs_flat:
input_layer = Variable(self.get_input_layer(center_i)).float()
y_true = Variable(torch.from_numpy(np.array([context_i])).long())
forward pass
-
look-up
- matmul(self.center_vectors, input_layer)는 matmul((D, V), (V, 1))의 연산입니다.
- 결과는 (D, 1)의 center vector 하나가 나옵니다.input_layer가 원핫 벡터이기 때문에 자신의 index에 해당하는 D차원의 벡터만 가져오게 됩니다.
- 인덱싱 역할을 합니다.
-
inner product
- 위에서 한 번 설명했던 내적부분입니다.
- matmul((V, D), (D, 1))을 하면 center vector를 모든 context vector들과 내적한 결과물 (V, 1) 벡터가 나옵니다.
-
softmax
- 이 내적값을 각각 softmax함수에 씌웁니다.
- torch에서는 log_softmax를 제공하기 때문에 이 함수를 사용하여 loss에 forward하기위해 log를 명시적으로 따로 씌우지 않았습니다.
-
nll_loss
- negative log likelihood loss를 이용합니다.
- view는 numpy의 reshape과 유사합니다.
- output_layer를 (1, V)로 만듭니다.
-
loss value update
- gradient update와는 상관 없지만, 학습과정을 보기 위하여 forward pass로 얻어진 loss를 epoch 내 모두 더합니다.
center_vector = torch.matmul(self.center_vectors, input_layer)
inner_products = torch.matmul(self.context_vectors, center_vector)
output_layer = F.log_softmax(inner_products, dim=0)
loss = F.nll_loss(output_layer.view(1, self.vocab_size), y_true)
loss_value += loss.item()
backward pass
- 최초에 Variable을 requires_grad=True로 선언했기 때문에 backward()로 초기 가중치까지 모두 gradient가 계산됩니다.
- 이 gradient 값에 learning rate을 곱한 후 빼서 gradient를 update합니다.
- gradient 값은 원래 텐서와 같은 차원으로 저장되어 있습니다.
- update 이후 grad.data.zero_()를 실행해 gradient를 다시 초기화합니다.
loss.backward()
self.center_vectors.data -= lr * self.center_vectors.grad.data
self.context_vectors.data -= lr * self.context_vectors.grad.data
self.center_vectors.grad.data.zero_()
self.context_vectors.grad.data.zero_()
printing losses
- 다른 프레임워크들에서 verbose 옵션 주는 것을 항상 따라하고 있습니다.
- 이 함수에는 조금 더 생각을 하고 verbose값을 줘야 합니다.
- epoch마다 평균 loss value가 출력됩니다.
if epoch % (10 ** verbose) == 0:
print(f"Loss at this epoch {epoch}: {loss_value / self.train_size}")
모델링 코드 완성본
class TrainWord2Vec:
def __init__(self):
pass
def prepare_corpus(self, corpus: List[str], win_size: int):
self.bow = BagOfWords()
self.bow.make_tokenized_matrix_english(corpus, lemmatize=True)
self.bow.make_token_indices()
self.bow.make_pairs_matrix(win_size=win_size, as_index=True)
self.vocab_size = len(self.bow.token2idx)
self.train_size = len(self.bow.pairs_flat)
def prepare_corpus_from_tokenized_bow(self, bow: BagOfWords, win_size: int):
self.bow = bow
self.bow.make_token_indices()
self.bow.make_pairs_matrix(win_size=win_size, as_index=True)
self.vocab_size = len(self.bow.token2idx)
self.train_size = len(self.bow.pairs_flat)
def get_input_layer(self, word_idx):
layer = torch.zeros(self.vocab_size)
layer[word_idx] = 1.0
return layer
def train(self, emb_dimension=30, epochs=10, lr=0.001, continue_last=False, verbose=1):
if not continue_last:
self.center_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(emb_dimension, self.vocab_size)),
requires_grad=True).float()
self.context_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(self.vocab_size, emb_dimension)),
requires_grad=True).float()
for epoch in range(epochs):
loss_value = 0
for center_i, context_i in self.bow.pairs_flat:
input_layer = Variable(self.get_input_layer(center_i)).float()
y_true = Variable(torch.from_numpy(np.array([context_i])).long())
center_vector = torch.matmul(self.center_vectors, input_layer)
inner_products = torch.matmul(self.context_vectors, center_vector)
output_layer = F.log_softmax(inner_products, dim=0)
loss = F.nll_loss(output_layer.view(1, self.vocab_size), y_true)
loss_value += loss.item()
loss.backward()
self.center_vectors.data -= lr * self.center_vectors.grad.data
self.context_vectors.data -= lr * self.context_vectors.grad.data
self.center_vectors.grad.data.zero_()
self.context_vectors.grad.data.zero_()
if epoch % (10 ** verbose) == 0:
print(f"Loss at this epoch {epoch}: {loss_value / self.train_size}")
학습 실행
- 모델을 잘 만들었으니 이제 학습만 시키면 됩니다.
- 학습을 위해 7개의 문장으로 약식 코퍼스를 만듭니다.
- 아래와 같이 trainer가 인스턴스화, 전처리, 학습이 각각 한줄만으로 실행되는 것을 볼 수 있습니다.
from train_ import *
eng_corpus = ['he is a king',
'she is a queen',
'he is a man',
'she is a woman',
'warsaw is poland capital',
'berlin is germany capital',
'paris is france capital']
wv_trainer = TrainWord2Vec()
wv_trainer.prepare_corpus(eng_corpus, win_size=2)
emb_dim = 5
wv_trainer.train(emb_dimension=emb_dim, epochs=1000, continue_last=False, lr=0.01, verbose=0)
시각화
TSNE plot 사용
- center word 벡터를 가져와 numpy array로 변환합니다.
- tsne plot을 통해 시각화합니다.
- 아래 시각화 함수는 따로 정의해두었습니다.
wc_arr = np.array(wv_trainer.center_vectors.data.T.data)
tsne_plot(wv_trainer.bow.unique_tokens, wc_arr, filename=f'remote_wc_word_vector.jpg', perplexity=4)
결과물
- 국가(poland, france, germany)와 수도(paris, berlin, warsaw)들은 각각의 클러스터를 형성하고 있는 것을 볼 수 있습니다.
- he - man, she - woman, king - queen도 나름의 거리를 유지하고 있습니다
- n차원의 공간을 2차원으로 시각화한 것으로 실제 계산하기 전까지는 평행여부나 벡터방향의 정확성을 확인하기는 어렵습니다
- 다만 적당한 거리씩을 유지하고 있다는 것은 확인할 수 있습니다.
끝마치며
- 실제로 사용할만큼 많은 코퍼스를 학습하려면 batch로 학습, negative sampling 적용 등이 필요합니다.
- 원론에 입각한 간단한 모델로도 결과물을 볼 수 있다는 것은 고무적입니다.