파이토치로 시작하는 딥러닝 기초 - Part4.RNN
- Recurrent Neural Network에 대한 소개
- RNN 기초
- RNN hihello & charseq
- Long sequence
- Timeseries
- seq2seq
- PackedSequence
PART 4: RNN
Lab11-0. RNN intro
- Sequential 데이터를 잘 다루기 위해 만들어진 모델
(데이터 값 뿐만 아니라 ‘순서’도 중요한 데이터)- ex) 단어, 문장, 시계열 etc
RNN
- 모든 Cell이 같은 Parameter를 공유한다.
- A에서는
- ex
- ex
- RNN의 사용
- one to many
- ex) 입력: 하나의 이미지 -> 출력: 문장
- many to one
- ex) 입력: 문장 -> 츨략: 감정에 대한 label
- many to many
- ex) 입력: 문장 -> 출력: 문장
- one to many
Lab11-1. RNN basics
PyTorch에서의 RNN 사용?
rnn = torch.nn.RNN(input_size, hidden_size) # 둘 다 integer outputs, _status = rnn(input_data) #input_data: 입력코자 하는 데이터
- input 데이터의 shape: 3개의 차원을 가짐 (x, y, z)
단어 ‘hello’를 RNN에 입력하는 과정?
# One hot encoding h = [1, 0, 0, 0] e = [0, 1, 0, 0] l = [0, 0, 1, 0] o = [0, 0, 0, 1] input_size = 4 # 4개의 차원을 input으로 받는다. -> input data의 shape는 (-,-,4) 형태 # word embedding을 사용할 경우에는 embedding vector의 dimension이 input size가 됨 # 이 input size는 model에 직접 알려주어야 함 hidden_size = 2 # 어떤 사이즈의 벡터를 출력하기를 원하는가? -> output의 shape는 (-,-,2) 형태 # ex. 기쁨, 슬픔 중 하나 출력 # 역시 model에게 미리 이를 선언해주어야 함 # 결론적으로, 4차원 벡터를 받아서 2차원 벡터를 출력하는 꼴 ...
- 여기서 드는 의문
‘어떻게 hidden state의 dimension이 output size와 동일한 차원을 갖게 되는가?’RNN cell 내부구조를 보면 이해할 수 있음
- 출력직전에 두 개의 가지로 갈라지는 설계!
- Sequence Length?
- HELLO: Sequence Length=5
x0 = [1,0,0,0]
x1 = [0,1,0,0]
x2 = [0,0,1,0]
x3 = [0,0,1,0]
x4 = [0,0,0,1] - Pytorch는 Sequence Length를 굳이 알려주지 않아도 알아서 파악함 -> input data의 shape는 (-,5,4)
- 따라서 input data만 잘 만들어주면 된다.
-> output data의 shape는 (-,5,2)
- 따라서 input data만 잘 만들어주면 된다.
- HELLO: Sequence Length=5
- Batch Size?
- RNN 또한 여러 개의 데이터를 하나의 Batch로 묶어서 모델에게 학습시킬 수 있다.
예를 들어, h,e,l,o 이 4개의 문자로 만들어진 수많은 단어들 중에서
h,e,l,l,o / e,o,l,l,l / l,l,e,e,l 과 같이 주어졌을 때, 이 세 개를 단어를 묶어서 하나의 배치로 구성하고
이를 모델에게 전달.- Batchsize 역시 모델이 자동으로 파악
-> Input data의 shape는 (3, 5, 4)
-> Output data의 shape는 (3, 5, 2)
(Batch size, Sequence length, embedding size)
전체 코드
import torch import numpy as np input_size = 4 hidden_size = 2 # One hot encoding h = [1, 0, 0, 0] e = [0, 1, 0, 0] l = [0, 0, 1, 0] o = [0, 0, 0, 1] input_data_np = np.array([[h, e, l, l, o], [e, o, l, l, l], [l, l, e, e, l]], dtype=np.float32) # Torch Tensor로 Transform input_data = torch.Tensor(input_data_np) # RNN 모델 rnn = torch.nn.RNN(input_size, hidden_size) outputs, _status = rnn(input_data)
Lab11-2. RNN hihello and charseq
- Character들을 어떻게 표현하는 것이 좋을까?
- 방법1. By index?
‘h’ -> 0
‘i’ -> 1
‘e’ -> 2
‘l’ -> 3
‘o’ -> 4
—그닥 좋은 방법은 아님
∵ 숫자의 크기에 따라 의미를 주는 셈.방법2. One-hot Encoding
char_set = ['h', 'i', 'e', 'l', 'o'] x_data = [[0,1,0,2,3,3]] x_one_hot = [[[1,0,0,0,0], [0,1,0,0,0], [1,0,0,0,0], [0,0,1,0,0], [0,0,0,1,0], [0,0,0,1,0]]] y_data = [[1,0,2,3,3,4]]
- Character들을 어떻게 표현하는 것이 좋을까?
- Cross Entropy Loss
Categorical output을 예측하는 모델에서 주로 쓰임
criterion = torch.nn.CrossEntropyLoss() ... # 첫번째 param: 모델의 output, 두번째 param: 정답 Label loss = criterion(outputs.view(-1, input_size), Y.view(-1))
‘hihello’ Code
char_set = ['h', 'i', 'e', 'l', 'o'] # hyper parameters input_size = len(char_set) hidden_size = len(char_set) learning_rate = 0.1 # data setting x_data = [[0,1,0,2,3,3]] x_one_hot = [[[1,0,0,0,0], [0,1,0,0,0], [1,0,0,0,0], [0,0,1,0,0], [0,0,0,1,0], [0,0,0,1,0]]] y_data = [[1,0,2,3,3,4]] X = torch.FloatTensor(x_one_hot) Y = torch.LongTensor(y_data)
charseq Code
import numpy as np sample = ' if you want you' # 딕셔너리 만들기 char_set = list(set(sample)) char_dic = {c: i for i, c in enumerate(char_set)} # hyper parameters dic_size = len(char_dic) hidden_size = len(char_dic) learning_rate = 0.1 # data setting sample_idx = [char_dic[c] for c in sample] x_data = [sample_idx[:-1]] x_one_hot = [np.eye(dic_size)[x] for x in x_data] # np.eye: Identity Matrix를 만들어 줌 y_data = [sample_idx[1:]] X = torch.FloatTensor(x_one_hot) Y = torch.LongTensor(y_data)
- 이어서, RNN 모델 만들기
# RNN 선언하기 # batch_first = True -> OUtput의 순서가 (B, S, F)임을 보장함 rnn = torch.nn.RNN(input_size, hidden_size, batch_first = True) # loss & optimizer setting criterion = torch.nn.CrossEntropyLoss() optimizer = optim.Adam(rnn.parameters(), learning_rate) # 학습 시작 for i in range(100): optimizer.zero_grad() outputs, _status = rnn(X) # X: input, _status: 만약 다음 input이 있다면 다음 RNN 계산에 쓰이게 될 hidden state loss = criterion(outputs.view(-1, input_size), Y.view(-1)) loss.backward() optimizer.step() result = outputs.data.numpy().argmax(axis=2) # Dim=2 -> 어떤 character인지를 나타냄 result_str = ''.join([char_set[c] for c in np.squeeze(result)]) print(i, 'loss: ', loss.item(), 'prediction: ', result, 'true Y: ', y_data, 'prediction str: ', result_str)
Lab11-3. Long sequence
11-2 보다 조금 더 긴 Character Sequence 모델링
아주 긴 문장을 하나의 Input으로 사용할 수는 없기에, 특정 사이즈로 잘라서 써야 함.
예시
sentence = ("if you want to build a ship, don't drum up people together to ", "collect wood and don't assign them tasks and work, but rather ", "teach them to long for the endless immensity of the sea.")
- Size 10의 chunk를 생각해보자.
- Size 10의 chunk를 생각해보자.
Sequence dataset 만들기
# 데이터 셋팅 x_data = [] y_data = [] for i in range(0, len(sentence) - sequence_length): x_str = sentence[i: i+sequence_length] y_str = sentence[i+1: i+sequence_length+1] print(i, x_str, '->', y_str) x_data.append([char_dic[c] for c in x_str]) # x str to index y_data.append([char_dic[c] for c in y_str]) # y str to index x_one_hot = [np.eye(dic_size)[x] for x in x_data] # transform as torch tensor variable X = torch.FloatTensor(x_one_hot) Y = torch.LongTensor(y_data)
- Vanilla RNN의 형태
- 하지만 모델이 Underfitting 된다든지, 그런 경우에, 좀 더 Complex한 모델을 만들고 싶을 수 있음
예) Fully Connected Layer 추가 + RNN Stacking
class Net(nn.Module): def __init__(self, input_dim, hidden_dim, layers): super(Net, self).__init__() self.rnn = torch.nn.RNN(input_dim, hidden_dim, num_layers = layers, batch_First = True) self.fc = torch.nn.Linear(hidden_dim, hidden_dim, bias=True) def forward(): x, _status = self.rnn(x) x = self.fc(x) return x net = Net(dic_size, hidden_size, 2) # RNN Layer 2개로 쌓겠다. # Loss & Optimizer 설정 criterion = torch.nn.CrossEntropyLoss() optimizer = optim.Adam((net.parameters(), learning_rate) # 훈련 시작 for i in range(100): optimizer.zero_grad() outputs = net(X) loss = criterion(outputs.view(-1, dic_size), Y.view(-1)) loss.backward() optimizer.step() # 모델이 예측한 결과물 해석 results = outputs.argmax(dim=2) predict_str = "" for j, result in enumerate(results): for j, result in enumerate(results): print(i, j, ''.join([char_set for t in result]), loss.item()) if j == 0: predict_str += ''.join([char_set[t] for t in result]) else: predict_str += char_set[result[:-1]]
Lab11-4. RNN Timeseries
- Timeseries Data?
- 시계열 데이터: 일정한 시간 간격으로 배치된 데이터
- ex. 주가 차트
- 예
- 구글의 일별 주가 데이터 (일별 시가, 고가, 저가, 종가, 거래량에 대한 데이터)
- ‘7일 간의 데이터를 입력받아서, 8일차의 종가 예측해보자!’
(사실 주식 시장에서 7일 간의 데이터를 ㅂ고 데이터를 예측하겠다는 건 역설이기는 함) 문제 정의: Many-to-One Structure
- 8일차 Vector의 Dim=1일 것 (종가 예측이기 때문)
- 문제는 Hidden State들의 Dim도 1이 되어야 된다는 건데, 이는 모델로서 굉장히 부담스러운 일
(이는 곧, 시가, 고가, 저가, 종가, 거래량 다섯개 데이털르 합쳐서 하나의 값으로 압축시켜야 하는 일이기 때문에)
- 문제는 Hidden State들의 Dim도 1이 되어야 된다는 건데, 이는 모델로서 굉장히 부담스러운 일
- 그래서, 8일차의 종가 예측을 위해서 Hidden State가 가진 Dimension 만큼 (이를 테면 10) 의 값을 최종 output으로 받고, 이 벡터에 Fully Connected Layer를 연결을 해서, 그 Layer의 Output이 종가를 맞추도록 하는 것이 일반적.
- 구글의 일별 주가 데이터 (일별 시가, 고가, 저가, 종가, 거래량에 대한 데이터)
코드
import torch import torch.optim as optim import numpy as np import matplotlib.pyplot as plt # Random seed 설정 torch.manual_seed(0) seq_length = 7 data_dim = 5 # 시가, 고가, 저가, 종가, 거래량 hidden_dim = 10 # 내가 임의로 설정 output_dim = 1 # Fully connected Layer가 맞추어야 할, 종가의 Dim learning_rate = 0.01 iterations = 500 xy = np.loadtxt('data-02-stock_daily.csv', delimeter=',') xy = xy[::-1] # 순서를 역순으로 만든다. train_size = int(len(xy) * 0.7) train_set = xy[0:train_size] test_set = xy[train_size - seq_length: ] # Scaling! ∵ 주가는 800선, 거래량은 1000000선 -> 이격이 너무 크다. # 모델은 1000000이 '숫자'로서 더 크다는 점을 인지를 함 # 따라서 모두 [0,1] range의 값으로 바꾸어준다. train_set = minmax_scaler(train_set) test_set = minmax_scaler(test_set) trainX, trainY = build_dataset(train_set, seq_length) testX, testY = build_dataset(test_set, seq_length) trainX_tensor = torch.FloatTensor(trainX) trainY_tensor = torch.FloatTensor(trainY) testX_tensor = torch.FloatTensor(testX) testY_tensor = torch.FloatTensor(testY)
About minmax_scaler & build_dataset
def minmax_scaler(data): numerator = data - np.min(data, 0) denominator = np.mad(data, 0) - np.min(data, 0) return numerator / (denominator + 1e-7) def build_dataset(time_series, seq_length): dataX = [] dataY = [] for i in range(0, len(time_series) - seq_length): _x = time_series[i: i+seq_length, :] _y = time_series[i + seq_length, [-1]] print(_x, '->', _y) dataX.append(_x) dataY.append(_y) return np.array(dataX), np.array(dataY)
NN 선언하기
class Net(torch.nn.Module): def __init__(self, input_dim, hidden_dim, output_dim, layers): super(Net, self).__init__() self.rnn = torch.nn.LSTM(input_dim, hidden_dim, num_layers=layers, batch_first=True) self.fc = torch.nn.Linear(hidden_dim, output_dim, bias=True) def forward(self, x): x, _status = self.rnn(x) x = self.fc(x[:, -1]) return x net = Net(data_dim, hidden_dim, output_dim, 1) criterion = torch.nn.MSELoss() optimizer = optim.Adam(net.parameters(), lr=learning_rate) for i in range(iterations): optimizer.zero_grad() outputs = net(trainX_tensor) loss = criterion(outputs, trainY_tensor) loss.backward() optimizer.step() print(i, loss.item()) plt.plot(testY) plt.plot(net(testX_tensor).data.numpy()) plt.legend(['original', 'prediction']) plt.show()
- 결과를 보면 굉장히 Prediction을 잘하는 것처럼 보이는데, 투자 모델 만들기 쉬워보이네?
- NONO, 주식시장은 변동을 일으키는 변수들이 정말 너무 많아서, 시가, 종가, 고가, 저가, 거래량 다섯 개의 피쳐만으로는 전체 주식시장의 예측이 정말 어려움
- 더많은 Feature를 고려할 것!
- 늘, 그렇게 성능이 좋지는 않아서, 심지어는 뉴스를 긁어와서, 감성분석을 한다거나 하는 등의 방법론을 통해 Feature들을 추가하는 사람들도 존재.
- Feature가 많아야 Robustness를 달성할 수 있을 (지도 모른다.)
Lab11-5. RNN seq2seq
Sequence를 입력받아서 Sequence를 출력하는 모델
- Seq2Seq
- 번역이나 Chatbot에서 잘 사용됨
- Encoder-Decoder 구조!
- Encoder
- 입력된 Sequence를 벡터의 형태로 압축
- 이 압축된 형태를 Decoder에 전달
- Decoder
- Encoder에서 만들어서 넘겨준 벡터를 첫 Cell의 Hidden State로 넘겨줌
- 문장이 시작한다는 <Start> Flag와 함께 Decoder 활용을 시작
- Encoder
Code
import random import torch import torch.nn as nn import torch.optim as optim ... SOURCE_MAX_LENGTH = 10 TARGET_MAX_LENGTH = 12 # raw: 원문, Source text 및 Target text를 구성하는 문장의 최대 길이를 제한 load_pairs, load_source_vocab, load_target_vocab = preprocess(raw, SOURCE_MAX_LENGTH, TARGET_MAX_LENGTH) print(random.choice(load_pairs)) enc_hidden_size = 16 dec_hidden_size = enc_hidden_size enc = Encoder(load_source_vovab.n_covab, enc_hidden_size).to(device) dec = Decoder(dec_hidden_size, load_target_vocab.n_vocab).to(device) train(load_pairs, load_source_vocab, load_target_vocab, enc, dec, 5000, print_every=1000) evaluate(load_paris, load_source_vocab, load_target_vocab, enc, dec, TARGET_MAX_LENGTH)
▲ 번역 Task를 시행하는 Seq2Seq 모델
- 번역: 번역의 원본이 되는 Source Text를 대상으로 함
Data Processing 단게
import random import torch import torch.nn as nn import torch.optim as optim torch.manual_seed(0) device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') raw = ["I feel hungry. 나는 배가 고프다.", "Pytorch is very easy. 파이토치는 매우 쉽다.", "Pytorch is a framework for deep learning. 파이토치는 딥러닝을 위한 프레임워크이다.", "Pytorch is very clear to use. 파이토치는 사용하기 매우 직관적이다."] SOS_token = 0 # Start of Sentence EOS_token = 1 # End of Sentence
# Source / Target Text를 나눠서 어떤 단어로 구성되어있고, 단어는 몇 개인지 측정 def preprocess(corpus, source_max_length, target_max_length): print('reading corpus...') pairs = [] for line in corpus: pairs.append([s for s in line.strip().lower().split('\t')]) print('Read {} sentence pairs'.format(len(pairs)) pairs = [pair for pair in paris if filter_pair(pair, source_max_length, target_max_length)] print('Trimmed to {} sentence pairs'.format(len(pairs)) source_vocab = Vocab() # Vocab이라는 클래스 선언 (이 클래스 내에 단어의 개수나 딕셔너리 같은 것들 넣어줌) target_vocab = Vocab() print('Counting words...') for pair in pairs: source_vocab.add_vocab(pair[0]) target_vocab.add_vocab(pair[1]) print('source vocab size =', source_vocab.n_vocab) print('target vocab size =', target_vocab.n_vocab) return pairs, source_vocab, target_vocab
Encoder
class Encoder(nn.Module): def __init__(self, input_size, hidden_size): super(Encoder, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(input_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size) def forward(self, x, hidden): x = self.embedding(x).view(1, 1, -1) x, hidden = self.gru(x, hidden) return x, hidden
Decoder
class Decoder(nn.Module): def __init__(self, hidden_size, output_size): super(Decoder, self).__init__() self.hidden_size = hidden_size self.embedding = nn.Embedding(output_size, hidden_size) self.gru = nn.GRU(hidden_size, hidden_size) self.out = nn.Linear(hidden_size, output_size) self.softmax = nn.LogSoftmax(dim=1) def forward(self, x, hidden): x = self.embedding(x).view(1, 1, -1) x, hidden = self.gru(x, hidden) x = self.softmax(self.out(x[0])) return x, hidden
학습 함수
# Sentence를 입력으로 받아서, sentence를 one-hot encoding으로 바꾸고, 최종적으로 Tensor의 형태로! def tensorize(vocab, sentence): indexes = [vocab.vocab2index[word] for word in sentence.split(' ')] indexes.append(vocab.vocab2index['<EOS>']) return torch.Tensor(indexes).long().to(device).view(-1,1) def train(pairs, source_vocab, target_vocab, encoder, decoder, n_iter, print_every=1000, learning_rate=0.01): loss_total = 0 encoder_optimizer = optim.SGD(encoder.parameters(), lr=learning_rate) decoder_optimizer = optim.SGD(decoder.parameters(), lr=learning_rate) training_batch = [random.choise(pairs) for _ in range(n_iter)] training_source = [tensorize(source_vocab, pair[0]) for pair in training_batch] training_target = [tensorize(target_vocab, pair[1]) for pair in training_batch] criterion = nn.NLLLoss() # Negative Loglikelihood
Training
def train(pairs, source_vocab, target_vocab, encoder, decoder, n_iter, print_every=1000, learning_rate=0.01): for i in range(1, n_iter+1): source_tensor = training_source[i-1] target_tensor = training_target[i-1] encoder_hidden = torch.zeros([1, 1, encoder.hidden_size]).to(device) encoder_optimizer.zero_grad() decoder_optimizer.zero_grad() source_length = source_tensor.size(0) target_length = target_tensor.size(0) loss = 0 for enc_input in range(source_length): _, encoder_hidden = encoder(source_tensor[enc_input], encoder_hidden) decoder_input = torch.Tensor([[SOS_token]]).long().to(device) decoder_hidden = encoder_hidden for di in range(target_length): decoder_output, decoder_hidden = decoder(decoder_input, decoder_hidden) loss += criterion(decoder_output, target_tensor[di]) decoder_input = target_tensor[di] # teacher forcing loss.backward() encoder_optimizer.step() decoder_optimizer.step() loss_iter = loss.item() / target_length loss_total += loss_iter if i % print_every == 0: loss_avg = loss_total / print_every loss_total = 0 print("[{} - {}%] loss = {:05.4f}".format(i, i/n_iter*100, loss_avg))
Lab11-6. PackedSequence
- 길이가 각각 다른 Sequence data를 하나의 Batch로 묶는 두 가지 방법