deal_with_cartpole_with_a2c.html
import sys
import gym
import pylab
import numpy as np
from keras.layers import Dense
from keras.models import Sequential
from keras.optimizers import Adam
from keras import backend as K
EPISODES = 5
class A2CAgent:
def __init__(self, state_size, action_size):
self.render = False
self.load_model = False
self.state_size = state_size
self.action_size = action_size
self.value_size = 1
self.discount_factor = 0.99
self.actor_lr = 0.001
self.critic_lr = 0.005
self.actor = self.build_actor()
self.critic = self.build_critic()
self.actor_updater = self.actor_optimizer()
self.critic_updater = self.critic_optimizer()
if self.load_model:
self.actor.load_weights("./save_model/cartpole_actor_trained.h5")
self.critic.load_weights("./save_model/cartpole_critic_trained.h5")
def build_actor(self):
actor = Sequential()
actor.add(Dense(
24
,input_dim=self.state_size
,activation='relu'
,kernel_initializer='he_uniform'))
actor.add(Dense(self.action_size, activation='softmax',
kernel_initializer='he_uniform'))
print("actor.summary()")
actor.summary()
return actor
def build_critic(self):
critic = Sequential()
critic.add(Dense(24, input_dim=self.state_size, activation='relu',
kernel_initializer='he_uniform'))
critic.add(Dense(24, input_dim=self.state_size, activation='relu',
kernel_initializer='he_uniform'))
critic.add(Dense(self.value_size, activation='linear',
kernel_initializer='he_uniform'))
critic.summary()
return critic
def get_action(self, state):
policy = self.actor.predict(state, batch_size=1).flatten()
return np.random.choice(self.action_size, 1, p=policy)[0]
def actor_optimizer(self):
action = K.placeholder(shape=[None, self.action_size])
advantage = K.placeholder(shape=[None, ])
action_prob = K.sum(action * self.actor.output, axis=1)
cross_entropy = K.log(action_prob) * advantage
loss = -K.sum(cross_entropy)
optimizer = Adam(lr=self.actor_lr)
updates = optimizer.get_updates(self.actor.trainable_weights, [], loss)
train = K.function([self.actor.input, action, advantage], [],
updates=updates)
return train
def critic_optimizer(self):
target = K.placeholder(shape=[None, ])
loss = K.mean(K.square(target - self.critic.output))
optimizer = Adam(lr=self.critic_lr)
updates = optimizer.get_updates(self.critic.trainable_weights, [], loss)
train = K.function([self.critic.input, target], [], updates=updates)
return train
def train_model(self, state, action, reward, next_state, done):
value = self.critic.predict(state)[0]
next_value = self.critic.predict(next_state)[0]
act = np.zeros([1, self.action_size])
act[0][action] = 1
if done:
advantage = reward - value
target = [reward]
else:
advantage = (reward + self.discount_factor * next_value) - value
target = reward + self.discount_factor * next_value
self.actor_updater([state, act, advantage])
self.critic_updater([state, target])
if __name__ == "__main__":
env = gym.make('CartPole-v1')
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
agent = A2CAgent(state_size, action_size)
scores, episodes = [], []
for e in range(EPISODES):
done = False
score = 0
state = env.reset()
state = np.reshape(state, [1, state_size])
while not done:
if agent.render:
env.render()
action = agent.get_action(state)
next_state, reward, done, info = env.step(action)
next_state = np.reshape(next_state, [1, state_size])
reward = reward if not done or score == 499 else -100
agent.train_model(state, action, reward, next_state, done)
score += reward
state = next_state
if done:
score = score if score == 500.0 else score + 100
scores.append(score)
episodes.append(e)
pylab.plot(episodes, scores, 'b')
pylab.savefig("./save_graph/cartpole_a2c.png")
print("episode:", e, " score:", score)
if np.mean(scores[-min(10, len(scores)):]) > 490:
agent.actor.save_weights("./save_model/cartpole_actor.h5")
agent.critic.save_weights("./save_model/cartpole_critic.h5")
sys.exit()