-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathblackjack_off_policy_incremental.py
91 lines (70 loc) · 2.63 KB
/
blackjack_off_policy_incremental.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import torch
import gym
from collections import defaultdict
env = gym.make('Blackjack-v0')
def gen_random_policy(n_action):
probs = torch.ones(n_action) / n_action
def policy_function(state):
return probs
return policy_function
random_policy = gen_random_policy(env.action_space.n)
def run_episode(env, behavior_policy):
r"""Run an episode given a behavior policy.
Args:
env: OpenAI Gym environment
behavior_policy: behavior policy
Returns:
resulting states, actions and rewards for the entire episode
"""
state = env.reset()
rewards = []
actions = []
states = []
is_done = False
while not is_done:
probs = behavior_policy(state)
action = torch.multinomial(probs, 1).item()
actions.append(action)
states.append(state)
state, reward, is_done, info = env.step(action)
rewards.append(reward)
if is_done:
break
return states, actions, rewards
def mc_control_off_policy_incremental(env, gamma, n_episode, behavior_policy):
"""Obtain the optimal policy with off-policy Monte Carlo control with incremental way of updating the Q function
Args:
env: OpenAI Gym environment
gamma: discount factor
n_episode: number of episodes
behavior_policy: behavior policy
Returns:
the optimal Q-function, and optimal policy
"""
n_action = env.action_space.n
number = defaultdict(int)
Q = defaultdict(lambda: torch.empty(n_action))
for episode in range(n_episode):
weight = 1
states_t, actions_t, rewards_t = run_episode(env, behavior_policy)
return_t = 0
for state_t, action_t, reward_t in zip(states_t[::-1], actions_t[::-1], rewards_t[::-1]):
return_t = gamma * return_t + reward_t
number[(state_t, action_t)] += 1
Q[state_t][action_t] += (weight / number[(state_t, action_t)]) * (return_t - Q[state_t][action_t])
if action_t != torch.argmax(Q[state_t]).item():
break
weight *= 1. / behavior_policy(state_t)[action_t]
policy = {}
for state, actions in Q.items():
policy[state] = torch.argmax(actions).item()
return Q, policy
gamma = 1
n_episode = 500000
optimal_Q, optimal_policy = mc_control_off_policy_incremental(env, gamma, n_episode, random_policy)
optimal_value = defaultdict(float)
for state, action_values in optimal_Q.items():
optimal_value[state] = torch.max(action_values).item()
print('Optimal Q:\n', optimal_Q)
print('Optimal policy:\n', optimal_policy)
print('Optimal value:\n', optimal_value)