-
Notifications
You must be signed in to change notification settings - Fork 1.8k
/
Copy pathdqn.py
146 lines (111 loc) · 4.43 KB
/
dqn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
# coding:utf-8
import logging
import random
import gym
import numpy as np
from gym import wrappers
np.random.seed(9999)
logger = logging.getLogger()
logger.setLevel(logging.INFO)
"""
References:
Sutton, Barto (2017). Reinforcement Learning: An Introduction. MIT Press, Cambridge, MA.
"""
class DQN(object):
def __init__(
self, n_episodes=500, gamma=0.99, batch_size=32, epsilon=1.0, decay=0.005, min_epsilon=0.1, memory_limit=500
):
"""Deep Q learning implementation.
Parameters
----------
min_epsilon : float
Minimal value for epsilon.
epsilon : float
ε-greedy value.
decay : float
Epsilon decay rate.
memory_limit : int
Limit of experience replay memory.
"""
self.memory_limit = memory_limit
self.min_epsilon = min_epsilon
self.gamma = gamma
self.epsilon = epsilon
self.n_episodes = n_episodes
self.batch_size = batch_size
self.decay = decay
def init_environment(self, name="CartPole-v0", monitor=False):
self.env = gym.make(name)
if monitor:
self.env = wrappers.Monitor(self.env, name, force=True, video_callable=False)
self.n_states = self.env.observation_space.shape[0]
self.n_actions = self.env.action_space.n
# Experience replay
self.replay = []
def init_model(self, model):
self.model = model(self.n_actions, self.batch_size)
def train(self, render=False):
max_reward = 0
for ep in range(self.n_episodes):
state = self.env.reset()
total_reward = 0
while True:
if render:
self.env.render()
if np.random.rand() <= self.epsilon:
# Exploration
action = np.random.randint(self.n_actions)
else:
# Exploitation
action = np.argmax(self.model.predict(state[np.newaxis, :])[0])
# Run one timestep of the environment
new_state, reward, done, _ = self.env.step(action)
self.replay.append([state, action, reward, new_state, done])
# Sample batch from experience replay
batch_size = min(len(self.replay), self.batch_size)
batch = random.sample(self.replay, batch_size)
X = np.zeros((batch_size, self.n_states))
y = np.zeros((batch_size, self.n_actions))
states = np.array([b[0] for b in batch])
new_states = np.array([b[3] for b in batch])
Q = self.model.predict(states)
new_Q = self.model.predict(new_states)
# Construct training data
for i in range(batch_size):
state_r, action_r, reward_r, new_state_r, done_r = batch[i]
target = Q[i]
if done_r:
target[action_r] = reward_r
else:
target[action_r] = reward_r + self.gamma * np.amax(new_Q[i])
X[i, :] = state_r
y[i, :] = target
# Train deep learning model
self.model.fit(X, y)
total_reward += reward
state = new_state
if done:
# Exit from current episode
break
# Remove old entries from replay memory
while len(self.replay) > self.memory_limit:
self.replay.pop(0)
self.epsilon = self.min_epsilon + (1.0 - self.min_epsilon) * np.exp(-self.decay * ep)
max_reward = max(max_reward, total_reward)
logger.info(
"Episode: %s, reward %s, epsilon %s, max reward %s" % (ep, total_reward, self.epsilon, max_reward)
)
logging.info("Training finished.")
def play(self, episodes):
for i in range(episodes):
state = self.env.reset()
total_reward = 0
while True:
self.env.render()
action = np.argmax(self.model.predict(state[np.newaxis, :])[0])
state, reward, done, _ = self.env.step(action)
total_reward += reward
if done:
break
logger.info("Episode: %s, reward %s" % (i, total_reward))
self.env.close()