007_002_lab_dqn_2013_cartpole.py.html import numpy as np import tensorflow as tf import random import dqn import gym from collections import deque env = gym.make('CartPole-v0') env = gym.wrappers.Monitor(env, 'gym-results/', force=True) INPUT_SIZE = env.observation_space.shape[0] OUTPUT_SIZE = env.action_space.n DISCOUNT_RATE = 0.99 REPLAY_MEMORY = 50000 MAX_EPISODE = 5000 BATCH_SIZE = 64 # minimum epsilon for epsilon greedy MIN_E = 0.0 # epsilon will be `MIN_E` at `EPSILON_DECAYING_EPISODE` EPSILON_DECAYING_EPISODE = MAX_EPISODE * 0.01 def bot_play(mainDQN: dqn.DQN) -> None: """Runs a single episode with rendering and prints a reward Args: mainDQN (dqn.DQN): DQN Agent """ state = env.reset() total_reward = 0 while True: env.render() action = np.argmax(mainDQN.predict(state)) state, reward, done, _ = env.step(action) total_reward += reward if done: print("Total score: {}".format(total_reward)) break def train_minibatch(DQN: dqn.DQN, train_batch: list) -> float: """Prepare X_batch, y_batch and train them Recall our loss function is target = reward + discount * max Q(s',a) or reward if done early Loss function: [target - Q(s, a)]^2 Hence, X_batch is a state list y_batch is reward + discount * max Q or reward if terminated early Args: DQN (dqn.DQN): DQN Agent to train & run train_batch (list): Minibatch of Replay memory Eeach element is a tuple of (s, a, r, s', done) Returns: loss: Returns a loss """ state_array = np.vstack([x[0] for x in train_batch]) action_array = np.array([x[1] for x in train_batch]) reward_array = np.array([x[2] for x in train_batch]) next_state_array = np.vstack([x[3] for x in train_batch]) done_array = np.array([x[4] for x in train_batch]) X_batch = state_array y_batch = DQN.predict(state_array) Q_target = reward_array + DISCOUNT_RATE * np.max(DQN.predict(next_state_array), axis=1) * ~done_array y_batch[np.arange(len(X_batch)), action_array] = Q_target # Train our network using target and predicted Q values on each episode loss, _ = DQN.update(X_batch, y_batch) return loss def annealing_epsilon(episode: int, min_e: float, max_e: float, target_episode: int) -> float: """Return an linearly annealed epsilon Epsilon will decrease over time until it reaches `target_episode` (epsilon) | max_e ---|\ | \ | \ | \ min_e ---|____\_______________(episode) | target_episode slope = (min_e - max_e) / (target_episode) intercept = max_e e = slope * episode + intercept Args: episode (int): Current episode min_e (float): Minimum epsilon max_e (float): Maximum epsilon target_episode (int): epsilon becomes the `min_e` at `target_episode` Returns: float: epsilon between `min_e` and `max_e` """ slope = (min_e - max_e) / (target_episode) intercept = max_e return max(min_e, slope * episode + intercept) def main(): # store the previous observations in replay memory replay_buffer = deque(maxlen=REPLAY_MEMORY) last_100_game_reward = deque(maxlen=100) with tf.Session() as sess: mainDQN = dqn.DQN(sess, INPUT_SIZE, OUTPUT_SIZE) init = tf.global_variables_initializer() sess.run(init) for episode in range(MAX_EPISODE): e = annealing_epsilon(episode, MIN_E, 1.0, EPSILON_DECAYING_EPISODE) done = False state = env.reset() step_count = 0 while not done: if np.random.rand() < e: action = env.action_space.sample() else: action = np.argmax(mainDQN.predict(state)) next_state, reward, done, _ = env.step(action) if done: reward = -1 replay_buffer.append((state, action, reward, next_state, done)) state = next_state step_count += 1 if len(replay_buffer) > BATCH_SIZE: minibatch = random.sample(replay_buffer, BATCH_SIZE) train_minibatch(mainDQN, minibatch) print("[Episode {:>5}] steps: {:>5} e: {:>5.2f}".format(episode, step_count, e)) # CartPole-v0 Game Clear Logic last_100_game_reward.append(step_count) if len(last_100_game_reward) == last_100_game_reward.maxlen: avg_reward = np.mean(last_100_game_reward) if avg_reward > 199.0: print("Game Cleared within {} episodes with avg reward {}".format(episode, avg_reward)) break if __name__ == "__main__": main()