013-004. using Char-CNN for Intent
@
import requests
import tensorflow as tf
import numpy as np
import os
from matplotlib.image import imread, imsave
import matplotlib.pyplot as plt
import pandas as pd
from konlpy.tag import Mecab
from gensim.models import word2vec
print("load done")
# load done
# @
# Parameters
vector_size = 50
encode_length = 4
label_size = 3
embed_type = "onehot" #onehot or w2v
# You can choose single test
# filter_type = "single"
# filter_number = 32
# filter_size = 2
# You can choose multi test
filter_type = "multi"
filter_sizes = [2,3,4,2,3,4,2,3,4]
# num_filters = 9
num_filters = len(filter_sizes)
train_data_list = {
'encode' : ['판교에 오늘 피자 주문해줘','오늘 날짜에 호텔 예약 해줄레','모래 날짜에 판교 여행 정보 알려줘'],
'decode' : ['0','1','2']
}
train_data_list.get('encode')
# < ['판교에 오늘 피자 주문해줘', '오늘 날짜에 호텔 예약 해줄레', '모래 날짜에 판교 여행 정보 알려줘']
# I vectorize string data
def train_vector_model(str_buf):
# I create mecab instance
mecab = Mecab('/usr/local/lib/mecab/dic/mecab-ko-dic')
# ['판교에 오늘 피자 주문해줘','오늘 날짜에 호텔 예약 해줄레','모래 날짜에 판교 여행 정보 알려줘']
str_buf = train_data_list['encode']
# I perform mecab pos tagging on str_buf
pos1 = mecab.pos(''.join(str_buf))
pos2 = ' '.join(list(map(lambda x : '\n' if x[1] in ['SF'] else x[0], pos1))).split('\n')
morphs = list(map(lambda x : mecab.morphs(x) , pos2))
print(str_buf)
model = word2vec.Word2Vec(size=vector_size, window=2, min_count=1)
model.build_vocab(morphs)
model.train(morphs)
return model
model = train_vector_model(train_data_list)
print(model)
# < ['판교에 오늘 피자 주문해줘', '오늘 날짜에 호텔 예약 해줄레', '모래 날짜에 판교 여행 정보 알려줘']
# < Word2Vec(vocab=15, size=50, alpha=0.025)
# @
# This method converts train data into dataframe
def load_csv(data_path):
df_csv_read = pd.DataFrame(data_path)
return df_csv_read
# This method embeds word to vector
def embed(data) :
mecab = Mecab('/usr/local/lib/mecab/dic/mecab-ko-dic')
inputs = []
labels = []
# ['판교에 오늘 피자 주문해줘','오늘 날짜에 호텔 예약 해줄레','모래 날짜에 판교 여행 정보 알려줘']
for encode_raw in data['encode'] :
# encode_raw = mecab.morphs('판교에 오늘 피자 주문해줘')
encode_raw = mecab.morphs(encode_raw)
encode_raw = list(map(lambda x : encode_raw[x] if x < len(encode_raw) else '#', range(encode_length)))
# parameter embed_type is initialized by onehot
if(embed_type == 'onehot') :
# I make 50 dimension array
bucket = np.zeros(vector_size, dtype=float).copy()
# if x is in model.wv.index2word, I invoke onehot_vectorize()
input = np.array(list(map(lambda x : onehot_vectorize(bucket, x) if x in model.wv.index2word else np.zeros(vector_size,dtype=float) , encode_raw)))
else :
input = np.array(list(map(lambda x : model[x] if x in model.wv.index2word else np.zeros(vector_size,dtype=float) , encode_raw)))
inputs.append(input.flatten())
# ['0','1','2']
for decode_raw in data['decode']:
# I create 3 dimension array
label = np.zeros(label_size, dtype=float)
# I put ['0','1','2'] into label array
np.put(label, decode_raw, 1)
labels.append(label)
return inputs, labels
def onehot_vectorize(bucket, x):
np.put(bucket, model.wv.index2word.index(x),1)
return bucket
# @
# This method embeds word into vector on prediction step
def inference_embed(data) :
mecab = Mecab('/usr/local/lib/mecab/dic/mecab-ko-dic')
encode_raw = mecab.morphs(data)
encode_raw = list(map(lambda x : encode_raw[x] if x < len(encode_raw) else '#', range(encode_length)))
if(embed_type == 'onehot') :
bucket = np.zeros(vector_size, dtype=float).copy()
input = np.array(list(map(lambda x : onehot_vectorize(bucket, x) if x in model.wv.index2word else np.zeros(vector_size,dtype=float) , encode_raw)))
else :
input = np.array(list(map(lambda x : model[x] if x in model.wv.index2word else np.zeros(vector_size,dtype=float) , encode_raw)))
return input
# @
# This method returns train dataset and test dataset for feed on tensorflow session
def get_test_data():
train_data, train_label = embed(load_csv(train_data_list))
test_data, test_label = embed(load_csv(train_data_list))
return train_label, test_label, train_data, test_data
# @
# This method creates graph with single filter size
def create_s_graph(train=True):
# placeholder x and y_target will be used when feeding data later
x = tf.placeholder("float", shape=[None, encode_length * vector_size], name = 'x')
y_target = tf.placeholder("float", shape=[None, label_size], name = 'y_target')
# I reshape input data
x_image = tf.reshape(x, [-1,encode_length,vector_size,1], name="x_image")
# I build 'convolutional layer' and 'maxpooling' with random initialization
# W is [row, col, channel, feature]
W_conv1 = tf.Variable(tf.truncated_normal([filter_size, filter_size, 1, filter_number], stddev=0.1), name="W_conv1")
# This is bias
b_conv1 = tf.Variable(tf.zeros([filter_number]), name="b_conv1")
h_conv1 = tf.nn.relu(tf.nn.conv2d(x_image, W_conv1, strides=[1, 1, 1, 1], padding='SAME') + b_conv1, name="h_conv1")
h_pool1 = tf.nn.max_pool( h_conv1 , ksize=[1, 2, 2, 1], strides=[1, 2, 2, 1], padding='SAME', name = "h_pool1")
# I build fully connected layer
h_pool2_flat = tf.reshape(h_pool1, [-1, int((encode_length/2)*(vector_size/2))*filter_number], name="h_pool2_flat")
W_fc1 = tf.Variable(tf.truncated_normal([int((encode_length/2)*(vector_size/2))*filter_number, 256], stddev=0.1), name = 'W_fc1')
b_fc1 = tf.Variable(tf.zeros([256]), name = 'b_fc1')
h_fc1 = tf.nn.relu(tf.matmul(h_pool2_flat, W_fc1) + b_fc1, name="h_fc1")
keep_prob = 1.0
if(train) :
# This will play role of 'dropout Layer'
keep_prob = tf.placeholder("float", name="keep_prob")
h_fc1 = tf.nn.dropout(h_fc1, keep_prob, name="h_fc1_drop")
# I build fully connected layer with softmax
W_fc2 = tf.Variable(tf.truncated_normal([256, label_size], stddev=0.1), name = 'W_fc2')
b_fc2 = tf.Variable(tf.zeros([label_size]), name = 'b_fc2')
#y=tf.nn.softmax(tf.matmul(h_fc1, W_fc2) + b_fc2, name="y")
y=tf.matmul(h_fc1, W_fc2) + b_fc2
# I define function with reduce_mean loss
#cross_entropy = -tf.reduce_sum(y_target*tf.log(y), name = 'cross_entropy')
cross_entropy = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=y, labels=y_target))
# I define optimization algorithm with AdamOptimizer
#train_step = tf.train.GradientDescentOptimizer(0.01).minimize(cross_entropy)
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
# I obtain prediction value by argmax()
# correct_prediction is list of boolean which is result of comparing between model prediction and data
correct_prediction = tf.equal(tf.argmax(y, 1), tf.argmax(y_target, 1))
# I evaluate prediction model
accuracy = tf.reduce_mean(tf.cast(correct_prediction, "float"))
# tf.cast() : changes true -> 1 / false -> 0
# tf.reduce_mean() : calculate the mean
# I return all data
return accuracy, x, y_target, keep_prob, train_step, y, cross_entropy, W_conv1
print("define cnn graph func")
# < define cnn graph func
# @
# This method creates graph with multi filter size
def create_m_graph(train=True):
x = tf.placeholder("float", shape=[None, encode_length * vector_size], name = 'x')
y_target = tf.placeholder("float", shape=[None, label_size], name = 'y_target')
x_image = tf.reshape(x, [-1,encode_length,vector_size,1], name="x_image")
l2_loss = tf.constant(0.0)
pooled_outputs = []
for i, filter_size in enumerate(filter_sizes):
with tf.name_scope("conv-maxpool-%s" % filter_size):
# Convolution Layer
filter_shape = [filter_size, vector_size, 1, num_filters]
W_conv1 = tf.Variable(tf.truncated_normal(filter_shape, stddev=0.1), name="W")
b_conv1 = tf.Variable(tf.constant(0.1, shape=[num_filters]), name="b")
conv = tf.nn.conv2d(
x_image,
W_conv1,
strides=[1, 1, 1, 1],
padding="VALID",
name="conv")
# ReLu applies 'nonlinearity'
h = tf.nn.relu(tf.nn.bias_add(conv, b_conv1), name="relu")
# I perform maxpooling on outputs
pooled = tf.nn.max_pool(
h,
ksize=[1, encode_length - filter_size + 1, 1, 1],
strides=[1, 1, 1, 1],
padding='VALID',
name="pool")
pooled_outputs.append(pooled)
# I combine all pooled features
num_filters_total = num_filters * len(filter_sizes)
h_pool = tf.concat(pooled_outputs, 3)
h_pool_flat = tf.reshape(h_pool, [-1, num_filters_total])
# I use dropout
keep_prob = 1.0
if(train) :
keep_prob = tf.placeholder("float", name="keep_prob")
h_pool_flat = tf.nn.dropout(h_pool_flat, keep_prob)
# These are final (unnormalized) scores and predictions
W_fc1 = tf.get_variable(
"W_fc1",
shape=[num_filters_total, label_size],
initializer=tf.contrib.layers.xavier_initializer())
b_fc1 = tf.Variable(tf.constant(0.1, shape=[label_size]), name="b")
l2_loss += tf.nn.l2_loss(W_fc1)
l2_loss += tf.nn.l2_loss(b_fc1)
y = tf.nn.xw_plus_b(h_pool_flat, W_fc1, b_fc1, name="scores")
predictions = tf.argmax(y, 1, name="predictions")
# CalculateMean cross-entropy loss
losses = tf.nn.softmax_cross_entropy_with_logits(logits=y, labels=y_target)
cross_entropy = tf.reduce_mean(losses)
train_step = tf.train.AdamOptimizer(1e-4).minimize(cross_entropy)
# Accuracy
correct_predictions = tf.equal(predictions, tf.argmax(y_target, 1))
accuracy = tf.reduce_mean(tf.cast(correct_predictions, "float"), name="accuracy")
return accuracy, x, y_target, keep_prob, train_step, y, cross_entropy, W_conv1
print("define cnn graph func")
# < define cnn graph func
# @
# This method visualizes weight matrix function
def show_layer(weight_list) :
if(filter_type == 'multi') :
show = np.array(weight_list).reshape(num_filters, filter_sizes[np.argmax(filter_sizes)], vector_size)
for i, matrix in enumerate(show) :
fig = plt.figure()
plt.imshow(matrix)
plt.show()
else:
show = np.array(weight_list).reshape(32, 2, 2)
for i, matrix in enumerate(show) :
fig = plt.figure()
plt.imshow(matrix)
plt.show()
# @
# This method runs training
def run() :
try :
# First, I need to get data
labels_train, labels_test, data_filter_train, data_filter_test = get_test_data()
# I reset Graph
tf.reset_default_graph()
# I create session
sess = tf.Session(config=tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth =True)))
# I create graph
if(filter_type == 'single') :
accuracy, x, y_target, keep_prob, train_step, y, cross_entropy, W_conv1 = create_s_graph(train=True)
else :
accuracy, x, y_target, keep_prob, train_step, y, cross_entropy, W_conv1 = create_m_graph(train=True)
# set saver
saver = tf.train.Saver(tf.all_variables())
# initialize the variables
sess.run(tf.global_variables_initializer())
# training the MLP
for i in range(200):
sess.run(train_step, feed_dict={x: data_filter_train, y_target: labels_train, keep_prob: 0.5})
if i%10 == 0:
train_accuracy = sess.run(accuracy, feed_dict={x:data_filter_train, y_target: labels_train, keep_prob: 1})
print ("step %d, training accuracy: %.3f"%(i, train_accuracy))
# for given x, y_target data set
print ("test accuracy: %g"% sess.run(accuracy, feed_dict={x:data_filter_test, y_target: labels_test, keep_prob: 1}))
# show weight matrix as image
weight_vectors = sess.run(W_conv1, feed_dict={x: data_filter_train, y_target: labels_train, keep_prob: 1.0})
#show_layer(weight_vectors)
# Save Model
path = './model/'
if not os.path.exists(path):
os.makedirs(path)
print("path created")
saver.save(sess, path)
print("model saved")
except Exception as e :
raise Exception ("error on training: {0}".format(e))
finally :
sess.close()
# run stuff
run()
# < WARNING:tensorflow:From :17: all_variables (from tensorflow.python.ops.variables) is deprecated and will be removed after 2017-03-02.
# < Instructions for updating:
# < Please use tf.global_variables instead.
# < step 0, training accuracy: 0.333
# < step 10, training accuracy: 0.333
# < step 20, training accuracy: 0.333
# < step 30, training accuracy: 0.333
# < step 40, training accuracy: 0.667
# < step 50, training accuracy: 0.667
# < step 60, training accuracy: 1.000
# < step 70, training accuracy: 1.000
# < step 80, training accuracy: 1.000
# < step 90, training accuracy: 1.000
# < step 100, training accuracy: 1.000
# < step 110, training accuracy: 1.000
# < step 120, training accuracy: 1.000
# < step 130, training accuracy: 1.000
# < step 140, training accuracy: 1.000
# < step 150, training accuracy: 1.000
# < step 160, training accuracy: 1.000
# < step 170, training accuracy: 1.000
# < step 180, training accuracy: 1.000
# < step 190, training accuracy: 1.000
# < test accuracy: 1
# < model saved
# @
# This method predicts test set
def predict(test_data) :
try :
# reset Graph
tf.reset_default_graph()
# Create Session
sess = tf.Session(config=tf.ConfigProto(gpu_options=tf.GPUOptions(allow_growth =True)))
# create graph
if(filter_type == 'single') :
_, x, _, _, _, y, _, _ = create_s_graph(train=False)
else :
_, x, _, _, _, y, _, _ = create_m_graph(train=False)
# initialize the variables
sess.run(tf.global_variables_initializer())
# set saver
saver = tf.train.Saver()
# Restore Model
path = './model/'
if os.path.exists(path):
saver.restore(sess, path)
print("model restored")
# training the MLP
#print("input data : {0}".format(test_data))
y = sess.run([y], feed_dict={x: np.array([test_data])})
print("result : {0}".format(y))
print("result : {0}".format(np.argmax(y)))
except Exception as e :
raise Exception ("error on training: {0}".format(e))
finally :
sess.close()
print("words in dict : {0}".format(model.wv.index2word))
predict(np.array(inference_embed("판교에 오늘 피자 주문해줘")).flatten())
predict(np.array(inference_embed("오늘 날짜에 호텔 예약 해줄수있어")).flatten())
predict(np.array(inference_embed("모래 날짜에 판교 여행 정보 알려줘")).flatten())
# < words in dict : ['에', '오늘', '해', '날짜', '판교', '알려줘', '피자', '예약', '여행', '정보', '주문', '모래', '호텔', '줄레', '줘']
# < INFO:tensorflow:Restoring parameters from ./model/
# < model restored
# < result : [array([[ 0.27849197, 0.08609096, -0.26966506]], dtype=float32)]
# < result : 0
# < INFO:tensorflow:Restoring parameters from ./model/
# < model restored
# < result : [array([[-0.28460395, 0.45481044, -0.13316658]], dtype=float32)]
# < result : 1
# < INFO:tensorflow:Restoring parameters from ./model/
# < model restored
# < result : [array([[-0.35431033, -0.54590696, 0.54696906]], dtype=float32)]
# < result : 2