Info

  • Author : 천용희 (Yonghee Cheon)
  • Type : 코드 실습 설명
  • Description : Pytorch로 구현하는 Word2Vec 기본 구조

들어가며

백문이 불여일견이고 백논문은 불여일코딩입니다.
컨셉은 알더라도 이를 구현해보면 더욱 확실하게 와닿습니다.
시작합니다.


모듈 임포트

from typing import *
from itertools import chain

import torch
import torch.nn.functional as F
from nltk import word_tokenize
from nltk.stem import WordNetLemmatizer
from torch.autograd import Variable

전처리 클래스

Intuition

  • Word2Vec Trainer에서 메소드 실행 한 번이면 전처리를 모두 완료하고 싶어서 전처리 함수를 묶어 클래스로 만들었습니다.
  • 학습을 위해 필요한 전처리 과정을 모두 담은 클래스를 만듭니다.
  • 우선 필요한 기능만 나열해 스켈레톤을 짜봅니다.

Skeleton

  • Word2Vec은 결국 윈도우 안에만 있으면 그외 컨텍스트를 반영하지 않기 때문에 Bag Of Words로 클래스 이름을 정했습니다.
  • 토큰화 -> token2idx생성 -> 윈도우 내 pair생성 -> pair 생성 wrapper로 구성하였습니다.
class BagOfWords:
    def __init__(self):
    
    def make_tokenized_matrix_english(self, texts: List[str], lemmatize=True):
    
    def make_token_indices(self):
     
    def get_window_pairs(self, tokens: List[str], win_size=4, as_index=True) -> List[Tuple]:
     
    def make_pairs_matrix(self, win_size, as_index=True):

토큰화 및 인덱싱

  • nltk의 tokenizer 및 lemmatizer를 이용하여 토큰화합니다.
  • lemmatize는 옵션으로 사용할 수 있게 두었습니다.
    def __init__(self):
        lemm = WordNetLemmatizer()
        self.lemmatizer = lemm.lemmatize

    def make_tokenized_matrix_english(self, texts: List[str], lemmatize=True):
        if lemmatize:
            self.tokenized_matrix = [[self.lemmatizer(word) for word in word_tokenize(text)] for text in texts]
        else:
            self.tokenized_matrix = [word_tokenize(text) for text in texts]

    def make_token_indices(self):
        assert self.tokenized_matrix
        self.unique_tokens = get_uniques_from_nested_lists(self.tokenized_matrix)
        self.token2idx, self.idx2token = get_item2idx(self.unique_tokens, unique=True)
        self.vocab_size = len(self.token2idx)
  • unique_token과 token2idx 생성시에는 아래 함수를 사용합니다.
def get_uniques_from_nested_lists(nested_lists: List[List]) -> List:
    uniques = {}
    for one_line in nested_lists:
        for item in one_line:
            if not uniques.get(item):
                uniques[item] = 1
    return list(uniques.keys())


def get_item2idx(items, unique=False) -> Tuple[Dict, Dict]:
    item2idx, idx2item = dict(), dict()
    items_unique = items if unique else set(items)
    for idx, item in enumerate(items_unique):
        item2idx[item] = idx
        idx2item[idx] = item
    return item2idx, idx2item

페어 생성 부분

  • start, end의 max, min 부분은 각 문서의 인덱스를 넘어가지 않도록 방지하는 부분입니다.
  • 윈도우와 문서 인덱스 내 있는 페어 중 자기 자신이 중복으로 들어가는 경우만 제외하고 모두 페어가 됩니다.
  • as_index가 True면 토큰의 인덱스가, False면 토큰이 그대로 저장됩니다.
    def get_window_pairs(self, tokens: List[str], win_size=4, as_index=True) -> List[Tuple]:
        window_pairs = []
        for idx, token in enumerate(tokens):
            start = max(0, idx - win_size)
            end = min(len(tokens), idx + win_size + 1)
            for win_idx in range(start, end):
                if not idx == win_idx:
                    pair = (token, tokens[win_idx])
                    pair = pair if not as_index else tuple(self.token2idx[t] for t in pair)
                    window_pairs.append(pair)
        return window_pairs

    def make_pairs_matrix(self, win_size, as_index=True):
        self.pairs_matrix = [self.get_window_pairs(sent, win_size, as_index) for sent in self.tokenized_matrix]
        self.pairs_flat = list(chain.from_iterable(self.pairs_matrix))

전처리 코드 완성본

def get_uniques_from_nested_lists(nested_lists: List[List]) -> List:
    uniques = {}
    for one_line in nested_lists:
        for item in one_line:
            if not uniques.get(item):
                uniques[item] = 1
    return list(uniques.keys())


def get_item2idx(items, unique=False) -> Tuple[Dict, Dict]:
    item2idx, idx2item = dict(), dict()
    items_unique = items if unique else set(items)
    for idx, item in enumerate(items_unique):
        item2idx[item] = idx
        idx2item[idx] = item
    return item2idx, idx2item


class BagOfWords:
    def __init__(self):
        lemm = WordNetLemmatizer()
        self.lemmatizer = lemm.lemmatize

    def make_tokenized_matrix_english(self, texts: List[str], lemmatize=True):
        if lemmatize:
            self.tokenized_matrix = [[self.lemmatizer(word) for word in word_tokenize(text)] for text in texts]
        else:
            self.tokenized_matrix = [word_tokenize(text) for text in texts]

    def make_token_indices(self):
        assert self.tokenized_matrix
        self.unique_tokens = get_uniques_from_nested_lists(self.tokenized_matrix)
        self.token2idx, self.idx2token = get_item2idx(self.unique_tokens, unique=True)
        self.vocab_size = len(self.token2idx)

    def get_window_pairs(self, tokens: List[str], win_size=4, as_index=True) -> List[Tuple]:
        window_pairs = []
        for idx, token in enumerate(tokens):
            start = max(0, idx - win_size)
            end = min(len(tokens), idx + win_size + 1)
            for win_idx in range(start, end):
                if not idx == win_idx:
                    pair = (token, tokens[win_idx])
                    pair = pair if not as_index else tuple(self.token2idx[t] for t in pair)
                    window_pairs.append(pair)
        return window_pairs

    def make_pairs_matrix(self, win_size, as_index=True):
        self.pairs_matrix = [self.get_window_pairs(sent, win_size, as_index) for sent in self.tokenized_matrix]
        self.pairs_flat = list(chain.from_iterable(self.pairs_matrix))

Word2Vec Trainer

  • BoW와 마찬가지로 클래스로 작성하여 바깥에서 두세줄로 학습이 실행되도록 만듭니다.

Skeleton

class TrainWord2Vec:
    def __init__(self):

    def prepare_corpus(self, corpus):
    	# (대충 전처리한다는 내용)

	def get_input_layer(self, word_idx):
    	# one hot encoding 수행
            
    def train(self, emb_dimension=30, epochs=10, lr=0.001, continue_last=False):
    	# center, context vector 초기화
        
        for epoch in range(epochs):
        	# 학습

전처리와 원 핫 인코딩 메소드

  • 위에서 작성한 BOW 클래스를 활용하여 전처리를 진행합니다.
  • 원 핫 인코딩 메소드의 경우 모듈화를 할 때 전역함수로 빼도 무방합니다. 지금은 한 코드베이스에서 모든 것을 수행하는 상황을 가정했기 때문에 클래스 안에 작성합니다.
    def prepare_corpus(self, corpus):
        self.bow = BagOfWords()
        self.bow.make_tokenized_matrix_english(corpus, lemmatize=True)
        self.bow.make_token_indices()
        self.bow.make_pairs_matrix(win_size=2, as_index=True)
        self.vocab_size = len(self.bow.token2idx)
        self.train_size = len(self.bow.pairs_flat)

    def get_input_layer(self, word_idx):
        layer = torch.zeros(self.vocab_size)
        layer[word_idx] = 1.0
        return layer

모델링

벡터(가중치) 초기화

  • Word2Vec은 뉴럴넷의 학습 과정을 통해 얻어진 가중치 매트릭스를 Representation으로 삼는 것이라고 볼 수 있기 때문에 가중치가 곧 우리가 원하는 타겟 벡터입니다.
  • Xavier 초기화를 하였습니다.
  • 임베딩 차원을 D, 전체 unique tokens 갯수를 V라고 할 때, center matrix는 (D, V)의 크기를, context matrix는 (V, D)의 크기를 갖도록 만듭니다.
    • 이유는 이후 look-up 과정을 통해 center vector는 (D, 1)의 크기를 갖게 되는데, matmul((V, D), (D, 1))을 하면 center vector를 모든 context vector와 내적하는 과정을 편리하게 계산할 수 있기 때문입니다.
self.center_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(emb_dimension, self.vocab_size)),
requires_grad=True).float()

self.context_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(self.vocab_size, emb_dimension)),
requires_grad=True).float()

epoch, iteration

  • 학습과정 확인을 위해 모든 iteration의 loss_value를 평균하여 epoch loss value를 출력합니다.
  • batch size가 1인 SGD 방식으로 구현하였습니다.
  • 매 iteration마다 center, context 벡터를 생성하는데 input_layer는 원핫 벡터로, y_true는 정답값을 원핫이 아닌 실수로 생성합니다.
for epoch in range(epochs):
    loss_value = 0
    for center_i, context_i in self.bow.pairs_flat:
        input_layer = Variable(self.get_input_layer(center_i)).float()
        y_true = Variable(torch.from_numpy(np.array([context_i])).long())

forward pass

  1. look-up

    • matmul(self.center_vectors, input_layer)는 matmul((D, V), (V, 1))의 연산입니다.
    • 결과는 (D, 1)의 center vector 하나가 나옵니다.input_layer가 원핫 벡터이기 때문에 자신의 index에 해당하는 D차원의 벡터만 가져오게 됩니다.
    • 인덱싱 역할을 합니다.
  2. inner product

    • 위에서 한 번 설명했던 내적부분입니다.
    • matmul((V, D), (D, 1))을 하면 center vector를 모든 context vector들과 내적한 결과물 (V, 1) 벡터가 나옵니다.
  3. softmax

    • 이 내적값을 각각 softmax함수에 씌웁니다.
    • torch에서는 log_softmax를 제공하기 때문에 이 함수를 사용하여 loss에 forward하기위해 log를 명시적으로 따로 씌우지 않았습니다.
  4. nll_loss

    • negative log likelihood loss를 이용합니다.
    • view는 numpy의 reshape과 유사합니다.
    • output_layer를 (1, V)로 만듭니다.
  5. loss value update

    • gradient update와는 상관 없지만, 학습과정을 보기 위하여 forward pass로 얻어진 loss를 epoch 내 모두 더합니다.
center_vector = torch.matmul(self.center_vectors, input_layer)
inner_products = torch.matmul(self.context_vectors, center_vector)
output_layer = F.log_softmax(inner_products, dim=0)
loss = F.nll_loss(output_layer.view(1, self.vocab_size), y_true)

loss_value += loss.item()

backward pass

  • 최초에 Variable을 requires_grad=True로 선언했기 때문에 backward()로 초기 가중치까지 모두 gradient가 계산됩니다.
  • 이 gradient 값에 learning rate을 곱한 후 빼서 gradient를 update합니다.
  • gradient 값은 원래 텐서와 같은 차원으로 저장되어 있습니다.
  • update 이후 grad.data.zero_()를 실행해 gradient를 다시 초기화합니다.
loss.backward()

self.center_vectors.data -= lr * self.center_vectors.grad.data
self.context_vectors.data -= lr * self.context_vectors.grad.data

self.center_vectors.grad.data.zero_()
self.context_vectors.grad.data.zero_()

printing losses

  • 다른 프레임워크들에서 verbose 옵션 주는 것을 항상 따라하고 있습니다.
  • 이 함수에는 조금 더 생각을 하고 verbose값을 줘야 합니다.
  • epoch마다 평균 loss value가 출력됩니다.
if epoch % (10 ** verbose) == 0:
	print(f"Loss at this epoch {epoch}: {loss_value / self.train_size}")

모델링 코드 완성본

class TrainWord2Vec:
    def __init__(self):
        pass

    def prepare_corpus(self, corpus: List[str], win_size: int):
        self.bow = BagOfWords()
        self.bow.make_tokenized_matrix_english(corpus, lemmatize=True)
        self.bow.make_token_indices()
        self.bow.make_pairs_matrix(win_size=win_size, as_index=True)
        self.vocab_size = len(self.bow.token2idx)
        self.train_size = len(self.bow.pairs_flat)

    def prepare_corpus_from_tokenized_bow(self, bow: BagOfWords, win_size: int):
        self.bow = bow
        self.bow.make_token_indices()
        self.bow.make_pairs_matrix(win_size=win_size, as_index=True)
        self.vocab_size = len(self.bow.token2idx)
        self.train_size = len(self.bow.pairs_flat)

    def get_input_layer(self, word_idx):
        layer = torch.zeros(self.vocab_size)
        layer[word_idx] = 1.0
        return layer

    def train(self, emb_dimension=30, epochs=10, lr=0.001, continue_last=False, verbose=1):
        if not continue_last:
            self.center_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(emb_dimension, self.vocab_size)),
                                           requires_grad=True).float()
            self.context_vectors = Variable(torch.nn.init.xavier_normal(torch.empty(self.vocab_size, emb_dimension)),
                                            requires_grad=True).float()

        for epoch in range(epochs):
            loss_value = 0
            for center_i, context_i in self.bow.pairs_flat:
                input_layer = Variable(self.get_input_layer(center_i)).float()
                y_true = Variable(torch.from_numpy(np.array([context_i])).long())

                center_vector = torch.matmul(self.center_vectors, input_layer)
                inner_products = torch.matmul(self.context_vectors, center_vector)
                output_layer = F.log_softmax(inner_products, dim=0)

                loss = F.nll_loss(output_layer.view(1, self.vocab_size), y_true)
                loss_value += loss.item()
                loss.backward()

                self.center_vectors.data -= lr * self.center_vectors.grad.data
                self.context_vectors.data -= lr * self.context_vectors.grad.data

                self.center_vectors.grad.data.zero_()
                self.context_vectors.grad.data.zero_()

            if epoch % (10 ** verbose) == 0:
                print(f"Loss at this epoch {epoch}: {loss_value / self.train_size}")

학습 실행

  • 모델을 잘 만들었으니 이제 학습만 시키면 됩니다.
  • 학습을 위해 7개의 문장으로 약식 코퍼스를 만듭니다.
  • 아래와 같이 trainer가 인스턴스화, 전처리, 학습이 각각 한줄만으로 실행되는 것을 볼 수 있습니다.
from train_ import *

eng_corpus = ['he is a king',
              'she is a queen',
              'he is a man',
              'she is a woman',
              'warsaw is poland capital',
              'berlin is germany capital',
              'paris is france capital']

wv_trainer = TrainWord2Vec()
wv_trainer.prepare_corpus(eng_corpus, win_size=2)

emb_dim = 5
wv_trainer.train(emb_dimension=emb_dim, epochs=1000, continue_last=False, lr=0.01, verbose=0)

시각화

TSNE plot 사용

  • center word 벡터를 가져와 numpy array로 변환합니다.
  • tsne plot을 통해 시각화합니다.
  • 아래 시각화 함수는 따로 정의해두었습니다.
wc_arr = np.array(wv_trainer.center_vectors.data.T.data)
tsne_plot(wv_trainer.bow.unique_tokens, wc_arr, filename=f'remote_wc_word_vector.jpg', perplexity=4)

결과물

  • 국가(poland, france, germany)와 수도(paris, berlin, warsaw)들은 각각의 클러스터를 형성하고 있는 것을 볼 수 있습니다.
  • he - man, she - woman, king - queen도 나름의 거리를 유지하고 있습니다
    • n차원의 공간을 2차원으로 시각화한 것으로 실제 계산하기 전까지는 평행여부나 벡터방향의 정확성을 확인하기는 어렵습니다
    • 다만 적당한 거리씩을 유지하고 있다는 것은 확인할 수 있습니다.

끝마치며

  • 실제로 사용할만큼 많은 코퍼스를 학습하려면 batch로 학습, negative sampling 적용 등이 필요합니다.
  • 원론에 입각한 간단한 모델로도 결과물을 볼 수 있다는 것은 고무적입니다.