"""Reinforcement Learning (Chapter 21)"""
import random
from collections import defaultdict
from mdp4e import MDP, policy_evaluation
# _________________________________________
# 21.2 Passive Reinforcement Learning
# 21.2.1 Direct utility estimation
[docs]
class PassiveDUEAgent:
"""
Passive (non-learning) agent that uses direct utility estimation
on a given MDP and policy::
import sys
from mdp import sequential_decision_environment
north = (0, 1)
south = (0,-1)
west = (-1, 0)
east = (1, 0)
policy = {(0, 2): east, (1, 2): east, (2, 2): east, (3, 2): None, (0, 1): north, (2, 1): north,
(3, 1): None, (0, 0): north, (1, 0): west, (2, 0): west, (3, 0): west,}
agent = PassiveDUEAgent(policy, sequential_decision_environment)
for i in range(200):
run_single_trial(agent,sequential_decision_environment)
agent.estimate_U()
agent.U[(0, 0)] > 0.2
True
"""
def __init__(self, pi, mdp):
self.pi = pi
self.mdp = mdp
self.U = {}
self.s = None
self.a = None
self.s_history = []
self.r_history = []
self.init = mdp.init
def __call__(self, percept):
s1, r1 = percept
self.s_history.append(s1)
self.r_history.append(r1)
##
##
if s1 in self.mdp.terminals:
self.s = self.a = None
else:
self.s, self.a = s1, self.pi[s1]
return self.a
[docs]
def estimate_U(self):
"""Update utility estimates from the most recent completed trial by direct
utility estimation: average the observed reward-to-go for each visited state,
blend it with the running estimate, and reset the trial history. Must be
called only once the MDP has reached a terminal state."""
# this function can be called only if the MDP has reached a terminal state
# it will also reset the mdp history
assert self.a is None, 'MDP is not in terminal state'
assert len(self.s_history) == len(self.r_history)
# calculating the utilities based on the current iteration
U2 = {s: [] for s in set(self.s_history)}
for i in range(len(self.s_history)):
s = self.s_history[i]
U2[s] += [sum(self.r_history[i:])]
U2 = {k: sum(v) / max(len(v), 1) for k, v in U2.items()}
# resetting history
self.s_history, self.r_history = [], []
# setting the new utilities to the average of the previous
# iteration and this one
for k in U2.keys():
if k in self.U.keys():
self.U[k] = (self.U[k] + U2[k]) / 2
else:
self.U[k] = U2[k]
return self.U
[docs]
def update_state(self, percept):
"""To be overridden in most cases. The default case
assumes the percept to be of type (state, reward)"""
return percept
# 21.2.2 Adaptive dynamic programming
[docs]
class PassiveADPAgent:
"""
[Figure 21.2]
Passive (non-learning) agent that uses adaptive dynamic programming
on a given MDP and policy::
import sys
from mdp import sequential_decision_environment
north = (0, 1)
south = (0,-1)
west = (-1, 0)
east = (1, 0)
policy = {(0, 2): east, (1, 2): east, (2, 2): east, (3, 2): None, (0, 1): north, (2, 1): north,
(3, 1): None, (0, 0): north, (1, 0): west, (2, 0): west, (3, 0): west,}
agent = PassiveADPAgent(policy, sequential_decision_environment)
for i in range(100):
run_single_trial(agent,sequential_decision_environment)
agent.U[(0, 0)] > 0.2
True
agent.U[(0, 1)] > 0.2
True
"""
[docs]
class ModelMDP(MDP):
"""Class for implementing modified Version of input MDP with
an editable transition model P and a custom function T."""
def __init__(self, init, actlist, terminals, gamma, states):
super().__init__(init, actlist, terminals, states=states, gamma=gamma)
nested_dict = lambda: defaultdict(nested_dict)
# StackOverflow:whats-the-best-way-to-initialize-a-dict-of-dicts-in-python
self.P = nested_dict()
[docs]
def T(self, s, a):
"""Return a list of tuples with probabilities for states
based on the learnt model P."""
return [(prob, res) for (res, prob) in self.P[(s, a)].items()]
def __init__(self, pi, mdp):
self.pi = pi
self.mdp = PassiveADPAgent.ModelMDP(mdp.init, mdp.actlist,
mdp.terminals, mdp.gamma, mdp.states)
self.U = {}
self.Nsa = defaultdict(int)
self.Ns1_sa = defaultdict(int)
self.s = None
self.a = None
self.visited = set() # keeping track of visited states
def __call__(self, percept):
s1, r1 = percept
mdp = self.mdp
R, P, terminals, pi = mdp.reward, mdp.P, mdp.terminals, self.pi
s, a, Nsa, Ns1_sa, U = self.s, self.a, self.Nsa, self.Ns1_sa, self.U
if s1 not in self.visited: # Reward is only known for visited state.
U[s1] = R[s1] = r1
self.visited.add(s1)
if s is not None:
Nsa[(s, a)] += 1
Ns1_sa[(s1, s, a)] += 1
# for each t such that Ns′|sa [t, s, a] is nonzero
for t in [res for (res, state, act), freq in Ns1_sa.items()
if (state, act) == (s, a) and freq != 0]:
P[(s, a)][t] = Ns1_sa[(t, s, a)] / Nsa[(s, a)]
self.U = policy_evaluation(pi, U, mdp)
##
##
self.Nsa, self.Ns1_sa = Nsa, Ns1_sa
if s1 in terminals:
self.s = self.a = None
else:
self.s, self.a = s1, self.pi[s1]
return self.a
[docs]
def update_state(self, percept):
"""To be overridden in most cases. The default case
assumes the percept to be of type (state, reward)."""
return percept
# 21.2.3 Temporal-difference learning
[docs]
class PassiveTDAgent:
"""
[Figure 21.4]
The abstract class for a Passive (non-learning) agent that uses
temporal differences to learn utility estimates. Override update_state
method to convert percept to state and reward. The mdp being provided
should be an instance of a subclass of the MDP Class::
import sys
from mdp import sequential_decision_environment
north = (0, 1)
south = (0,-1)
west = (-1, 0)
east = (1, 0)
policy = {(0, 2): east, (1, 2): east, (2, 2): east, (3, 2): None, (0, 1): north, (2, 1): north,
(3, 1): None, (0, 0): north, (1, 0): west, (2, 0): west, (3, 0): west,}
agent = PassiveTDAgent(policy, sequential_decision_environment, alpha=lambda n: 60./(59+n))
for i in range(200):
run_single_trial(agent,sequential_decision_environment)
agent.U[(0, 0)] > 0.2
True
agent.U[(0, 1)] > 0.2
True
"""
def __init__(self, pi, mdp, alpha=None):
self.pi = pi
self.U = {s: 0. for s in mdp.states}
self.Ns = {s: 0 for s in mdp.states}
self.s = None
self.a = None
self.r = None
self.gamma = mdp.gamma
self.terminals = mdp.terminals
if alpha:
self.alpha = alpha
else:
self.alpha = lambda n: 1 / (1 + n) # udacity video
def __call__(self, percept):
s1, r1 = self.update_state(percept)
pi, U, Ns, s, r = self.pi, self.U, self.Ns, self.s, self.r
alpha, gamma, terminals = self.alpha, self.gamma, self.terminals
if not Ns[s1]:
U[s1] = r1
if s is not None:
Ns[s] += 1
U[s] += alpha(Ns[s]) * (r + gamma * U[s1] - U[s])
if s1 in terminals:
self.s = self.a = self.r = None
else:
self.s, self.a, self.r = s1, pi[s1], r1
return self.a
[docs]
def update_state(self, percept):
"""To be overridden in most cases. The default case
assumes the percept to be of type (state, reward)."""
return percept
# __________________________________________
# 21.3. Active Reinforcement Learning
# 21.3.2 Learning an action-utility function
[docs]
class QLearningAgent:
"""
[Figure 21.8]
An exploratory Q-learning agent. It avoids having to learn the transition
model because the Q-value of a state can be related directly to those of
its neighbors::
import sys
from mdp import sequential_decision_environment
north = (0, 1)
south = (0,-1)
west = (-1, 0)
east = (1, 0)
policy = {(0, 2): east, (1, 2): east, (2, 2): east, (3, 2): None, (0, 1): north, (2, 1): north,
(3, 1): None, (0, 0): north, (1, 0): west, (2, 0): west, (3, 0): west,}
q_agent = QLearningAgent(sequential_decision_environment, Ne=5, Rplus=2, alpha=lambda n: 60./(59+n))
for i in range(200):
run_single_trial(q_agent,sequential_decision_environment)
q_agent.Q[((0, 1), (0, 1))] >= -0.5
True
q_agent.Q[((1, 0), (0, -1))] <= 0.5
True
"""
def __init__(self, mdp, Ne, Rplus, alpha=None):
self.gamma = mdp.gamma
self.terminals = mdp.terminals
self.all_act = mdp.actlist
self.Ne = Ne # iteration limit in exploration function
self.Rplus = Rplus # large value to assign before iteration limit
self.Q = defaultdict(float)
self.Nsa = defaultdict(float)
self.s = None
self.a = None
self.r = None
if alpha:
self.alpha = alpha
else:
self.alpha = lambda n: 1. / (1 + n) # udacity video
[docs]
def f(self, u, n):
"""Exploration function. Returns fixed Rplus until
agent has visited state, action a Ne number of times.
Same as ADP agent in book."""
if n < self.Ne:
return self.Rplus
else:
return u
[docs]
def actions_in_state(self, state):
"""Return actions possible in given state.
Useful for max and argmax."""
if state in self.terminals:
return [None]
else:
return self.all_act
def __call__(self, percept):
s1, r1 = self.update_state(percept)
Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r
alpha, gamma, terminals = self.alpha, self.gamma, self.terminals,
actions_in_state = self.actions_in_state
if s in terminals:
Q[s, None] = r1
if s is not None:
Nsa[s, a] += 1
Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * max(Q[s1, a1]
for a1 in actions_in_state(s1)) - Q[s, a])
if s in terminals:
self.s = self.a = self.r = None
else:
self.s, self.r = s1, r1
self.a = max(actions_in_state(s1), key=lambda a1: self.f(Q[s1, a1], Nsa[s1, a1]))
return self.a
[docs]
def update_state(self, percept):
"""To be overridden in most cases. The default case
assumes the percept to be of type (state, reward)."""
return percept
[docs]
class SARSALearningAgent(QLearningAgent):
"""
[Section 22.3]
An on-policy temporal-difference control agent (SARSA: State-Action-Reward-
State-Action). It is identical to the Q-learning agent except for the update
rule: instead of bootstrapping on the maximum Q-value over next actions, SARSA
bootstraps on the Q-value of the action a1 that its exploration policy will
actually take in the next state. Being on-policy, SARSA learns the value of the
policy it is following, exploration included, rather than that of the greedy
policy.
"""
def __call__(self, percept):
s1, r1 = self.update_state(percept)
Q, Nsa, s, a, r = self.Q, self.Nsa, self.s, self.a, self.r
alpha, gamma, terminals = self.alpha, self.gamma, self.terminals
actions_in_state = self.actions_in_state
# pick the next action with the same exploration policy as Q-learning
a1 = max(actions_in_state(s1), key=lambda a2: self.f(Q[s1, a2], Nsa[s1, a2]))
if s in terminals:
Q[s, None] = r1
if s is not None:
Nsa[s, a] += 1
# on-policy update: bootstrap on the actually-chosen next action a1
Q[s, a] += alpha(Nsa[s, a]) * (r + gamma * Q[s1, a1] - Q[s, a])
if s in terminals:
self.s = self.a = self.r = None
else:
self.s, self.r = s1, r1
self.a = a1
return self.a
[docs]
def run_single_trial(agent_program, mdp):
"""Execute trial for given agent_program
and mdp. mdp should be an instance of subclass
of mdp.MDP """
def take_single_action(mdp, s, a):
"""
Select outcome of taking action a
in state s. Weighted Sampling.
"""
x = random.uniform(0, 1)
cumulative_probability = 0.0
for probability_state in mdp.T(s, a):
probability, state = probability_state
cumulative_probability += probability
if x < cumulative_probability:
break
return state
current_state = mdp.init
while True:
current_reward = mdp.R(current_state)
percept = (current_state, current_reward)
next_action = agent_program(percept)
if next_action is None:
break
current_state = take_single_action(mdp, current_state, next_action)