"""
Markov Decision Processes (Chapter 16)
First we define an MDP, and the special case of a GridMDP, in which
states are laid out in a 2-dimensional grid. We also represent a policy
as a dictionary of {state: action} pairs, and a Utility function as a
dictionary of {state: number} pairs. We then define the value_iteration
and policy_iteration algorithms.
"""
import random
from collections import defaultdict
import numpy as np
from utils4e import vector_add, orientations, turn_right, turn_left
[docs]
class MDP:
"""A Markov Decision Process, defined by an initial state, transition model,
and reward function. We also keep track of a gamma value, for use by
algorithms. The transition model is represented somewhat differently from
the text. Instead of P(s' | s, a) being a probability number for each
state/state/action triplet, we instead have T(s, a) return a
list of (p, s') pairs. We also keep track of the possible states,
terminal states, and actions for each state. [Page 646]"""
def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9):
if not (0 < gamma <= 1):
raise ValueError("An MDP must have 0 < gamma <= 1")
# collect states from transitions table if not passed.
self.states = states or self.get_states_from_transitions(transitions)
self.init = init
if isinstance(actlist, list):
# if actlist is a list, all states have the same actions
self.actlist = actlist
elif isinstance(actlist, dict):
# if actlist is a dict, different actions for each state
self.actlist = actlist
elif isinstance(actlist, set):
# if actlist is a set, convert it to a list so actions are indexable
self.actlist = list(actlist)
self.terminals = terminals
self.transitions = transitions or {}
if not self.transitions:
print("Warning: Transition table is empty.")
self.gamma = gamma
self.reward = reward or {s: 0 for s in self.states}
# self.check_consistency()
[docs]
def R(self, state):
"""Return a numeric reward for this state."""
return self.reward[state]
[docs]
def T(self, state, action):
"""Transition model. From a state and an action, return a list
of (probability, result-state) pairs."""
if not self.transitions:
raise ValueError("Transition model is missing")
else:
return self.transitions[state][action]
[docs]
def actions(self, state):
"""Return a list of actions that can be performed in this state. By default, a
fixed list of actions, except for terminal states. Override this
method if you need to specialize by state."""
if state in self.terminals:
return [None]
else:
return self.actlist
[docs]
def get_states_from_transitions(self, transitions):
"""Return the set of all states mentioned in the transition model, both as source
states and as reachable next states (or None if transitions is not a dict)."""
if isinstance(transitions, dict):
s1 = set(transitions.keys())
s2 = set(tr[1] for actions in transitions.values()
for effects in actions.values()
for tr in effects)
return s1.union(s2)
else:
print('Could not retrieve states from transitions')
return None
[docs]
def check_consistency(self):
"""Assert that the MDP is well formed: the states match those in the transitions,
the initial and terminal states are valid, rewards are defined for every state, and
each action's outcome probabilities sum to 1."""
# check that all states in transitions are valid
assert set(self.states) == self.get_states_from_transitions(self.transitions)
# check that init is a valid state
assert self.init in self.states
# check reward for each state
assert set(self.reward.keys()) == set(self.states)
# check that all terminals are valid states
assert all(t in self.states for t in self.terminals)
# check that probability distributions for all actions sum to 1
for s1, actions in self.transitions.items():
for a in actions.keys():
s = 0
for o in actions[a]:
s += o[0]
assert abs(s - 1) < 0.001
[docs]
class MDP2(MDP):
"""
Inherits from MDP. Handles terminal states, and transitions to and from terminal states better.
"""
def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9):
MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma)
[docs]
def T(self, state, action):
"""Return a list of (probability, next-state) pairs for taking the action in the
given state; a None action stays in place with probability 0.0."""
if action is None:
return [(0.0, state)]
else:
return self.transitions[state][action]
[docs]
class GridMDP(MDP):
"""A two-dimensional grid MDP, as in [Figure 16.1]. All you have to do is
specify the grid as a list of lists of rewards; use None for an obstacle
(unreachable state). Also, you should specify the terminal states.
An action is an (x, y) unit vector; e.g. (1, 0) means move east."""
def __init__(self, grid, terminals, init=(0, 0), gamma=.9):
grid.reverse() # because we want row 0 on bottom, not on top
reward = {}
states = set()
self.rows = len(grid)
self.cols = len(grid[0])
self.grid = grid
for x in range(self.cols):
for y in range(self.rows):
if grid[y][x]:
states.add((x, y))
reward[(x, y)] = grid[y][x]
self.states = states
actlist = orientations
transitions = {}
for s in states:
transitions[s] = {}
for a in actlist:
transitions[s][a] = self.calculate_T(s, a)
MDP.__init__(self, init, actlist=actlist,
terminals=terminals, transitions=transitions,
reward=reward, states=states, gamma=gamma)
[docs]
def calculate_T(self, state, action):
"""Return the (probability, next-state) outcomes for the intended action: 0.8 of
moving as intended and 0.1 each of veering to the right or left."""
if action:
return [(0.8, self.go(state, action)),
(0.1, self.go(state, turn_right(action))),
(0.1, self.go(state, turn_left(action)))]
else:
return [(0.0, state)]
[docs]
def T(self, state, action):
"""Return the list of (probability, next-state) pairs for the action in the given
state; a falsy action stays in place with probability 0.0."""
return self.transitions[state][action] if action else [(0.0, state)]
[docs]
def go(self, state, direction):
"""Return the state that results from going in this direction."""
state1 = tuple(vector_add(state, direction))
return state1 if state1 in self.states else state
[docs]
def to_grid(self, mapping):
"""Convert a mapping from (x, y) to v into a [[..., v, ...]] grid."""
return list(reversed([[mapping.get((x, y), None)
for x in range(self.cols)]
for y in range(self.rows)]))
[docs]
def to_arrows(self, policy):
"""Render a policy as a grid of arrow characters (``>``, ``^``, ``<``, ``v``, with
``.`` for a None action) laid out to match the grid."""
chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'}
return self.to_grid({s: chars[a] for (s, a) in policy.items()})
# ______________________________________________________________________________
""" [Figure 16.1]
A 4x3 grid environment that presents the agent with a sequential decision problem.
"""
sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1],
[-0.04, None, -0.04, -1],
[-0.04, -0.04, -0.04, -0.04]],
terminals=[(3, 2), (3, 1)])
# ______________________________________________________________________________
# 16.1.3 The Bellman equation for utilities
[docs]
def q_value(mdp, s, a, U):
"""Return the Q-value of taking action ``a`` in state ``s`` given the utility estimates
``U``, i.e. the expected sum of the immediate reward and the discounted utility of the
successor states. With no action it falls back to the reward of ``s``."""
if not a:
return mdp.R(s)
res = 0
for p, s_prime in mdp.T(s, a):
res += p * (mdp.R(s) + mdp.gamma * U[s_prime])
return res
# Dynamic decision networks (DDNs) are solved online by the belief-state
# look-ahead agent `pomdp_lookahead` defined below, with belief updates via
# `update_belief` (POMDP filtering).
# ______________________________________________________________________________
# 16.2 Algorithms for MDPs
# 16.2.1 Value Iteration
[docs]
def value_iteration(mdp, epsilon=0.001):
"""Solving an MDP by value iteration. [Figure 16.6]"""
U1 = {s: 0 for s in mdp.states}
R, T, gamma = mdp.R, mdp.T, mdp.gamma
while True:
U = U1.copy()
delta = 0
for s in mdp.states:
# U1[s] = R(s) + gamma * max(sum(p * U[s1] for (p, s1) in T(s, a))
# for a in mdp.actions(s))
U1[s] = max(q_value(mdp, s, a, U) for a in mdp.actions(s))
delta = max(delta, abs(U1[s] - U[s]))
if delta <= epsilon * (1 - gamma) / gamma:
return U
# ______________________________________________________________________________
# 16.2.2 Policy Iteration
[docs]
def best_policy(mdp, U):
"""Given an MDP and a utility function U, determine the best policy,
as a mapping from state to action."""
pi = {}
for s in mdp.states:
pi[s] = max(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U))
return pi
[docs]
def expected_utility(a, s, U, mdp):
"""The expected utility of doing a in state s, according to the MDP and U."""
return sum(p * U[s1] for (p, s1) in mdp.T(s, a))
[docs]
def policy_iteration(mdp):
"""Solve an MDP by policy iteration [Figure 17.7]"""
U = {s: 0 for s in mdp.states}
pi = {s: random.choice(mdp.actions(s)) for s in mdp.states}
while True:
U = policy_evaluation(pi, U, mdp)
unchanged = True
for s in mdp.states:
a_star = max(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U))
# a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp))
if q_value(mdp, s, a_star, U) > q_value(mdp, s, pi[s], U):
pi[s] = a_star
unchanged = False
if unchanged:
return pi
[docs]
def policy_evaluation(pi, U, mdp, k=20):
"""Return an updated utility mapping U from each state in the MDP to its
utility, using an approximation (modified policy iteration)."""
R, T, gamma = mdp.R, mdp.T, mdp.gamma
for i in range(k):
for s in mdp.states:
U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s]))
return U
# ___________________________________________________________________
# 16.4 Partially Observed MDPs
[docs]
class POMDP(MDP):
r"""A Partially Observable Markov Decision Process, defined by
a transition model P(s'\|s,a), actions A(s), a reward function R(s),
and a sensor model P(e\|s). We also keep track of a gamma value,
for use by algorithms. The transition and the sensor models
are defined as matrices. We also keep track of the possible states
and actions for each state. [Page 659]."""
def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95):
"""Initialize variables of the pomdp"""
if not (0 < gamma <= 1):
raise ValueError('A POMDP must have 0 < gamma <= 1')
self.states = states
self.actions = actions
# transition model cannot be undefined
self.t_prob = transitions or {}
if not self.t_prob:
print('Warning: Transition model is undefined')
# sensor model cannot be undefined
self.e_prob = evidences or {}
if not self.e_prob:
print('Warning: Sensor model is undefined')
self.gamma = gamma
self.rewards = rewards
[docs]
def remove_dominated_plans(self, input_values):
"""
Remove dominated plans.
This method finds all the lines contributing to the
upper surface and removes those which don't.
"""
values = [val for action in input_values for val in input_values[action]]
values.sort(key=lambda x: x[0], reverse=True)
best = [values[0]]
y1_max = max(val[1] for val in values)
tgt = values[0]
prev_b = 0
prev_ix = 0
while tgt[1] != y1_max:
min_b = 1
min_ix = 0
for i in range(prev_ix + 1, len(values)):
if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0:
trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1])
if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b:
min_b = trans_b
min_ix = i
prev_b = min_b
prev_ix = min_ix
tgt = values[min_ix]
best.append(tgt)
return self.generate_mapping(best, input_values)
[docs]
def remove_dominated_plans_fast(self, input_values):
"""
Remove dominated plans using approximations.
Resamples the upper boundary at intervals of 100 and
finds the maximum values at these points.
"""
values = [val for action in input_values for val in input_values[action]]
values.sort(key=lambda x: x[0], reverse=True)
best = []
sr = 100
for i in range(sr + 1):
x = i / float(sr)
maximum = (values[0][1] - values[0][0]) * x + values[0][0]
tgt = values[0]
for value in values:
val = (value[1] - value[0]) * x + value[0]
if val > maximum:
maximum = val
tgt = value
if all(any(tgt != v) for v in best):
best.append(np.array(tgt))
return self.generate_mapping(best, input_values)
[docs]
def generate_mapping(self, best, input_values):
"""Generate mappings after removing dominated plans"""
mapping = defaultdict(list)
for value in best:
for action in input_values:
if any(all(value == v) for v in input_values[action]):
mapping[action].append(value)
return mapping
[docs]
def max_difference(self, U1, U2):
"""Find maximum difference between two utility mappings"""
for k, v in U1.items():
sum1 = 0
for element in U1[k]:
sum1 += sum(element)
sum2 = 0
for element in U2[k]:
sum2 += sum(element)
return abs(sum1 - sum2)
[docs]
class Matrix:
"""Matrix operations class"""
[docs]
@staticmethod
def add(A, B):
"""Add two matrices A and B"""
res = []
for i in range(len(A)):
row = []
for j in range(len(A[0])):
row.append(A[i][j] + B[i][j])
res.append(row)
return res
[docs]
@staticmethod
def scalar_multiply(a, B):
"""Multiply scalar a to matrix B"""
for i in range(len(B)):
for j in range(len(B[0])):
B[i][j] = a * B[i][j]
return B
[docs]
@staticmethod
def multiply(A, B):
"""Multiply two matrices A and B element-wise"""
matrix = []
for i in range(len(A)):
row = []
for j in range(len(A[0])):
row.append(A[i][j] * B[i][j])
matrix.append(row)
return matrix
[docs]
@staticmethod
def matmul(A, B):
"""Inner-product of two matrices"""
return [[sum(ele_a * ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A]
[docs]
@staticmethod
def transpose(A):
"""Transpose a matrix"""
return [list(i) for i in zip(*A)]
[docs]
def pomdp_value_iteration(pomdp, epsilon=0.1):
"""Solving a POMDP by value iteration."""
U = {'': [[0] * len(pomdp.states)]}
count = 0
while True:
count += 1
prev_U = U
values = [val for action in U for val in U[action]]
value_matxs = []
for i in values:
for j in values:
value_matxs.append([i, j])
U1 = defaultdict(list)
for action in pomdp.actions:
for u in value_matxs:
u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)],
Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))),
[[1], [1]])
u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]])
U1[action].append(u1[0])
U = pomdp.remove_dominated_plans_fast(U1)
# replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations
if count > 10:
if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma:
return U
[docs]
def update_belief(pomdp, belief, action, observation):
"""
[Equation 17.17]
POMDP filtering: update the belief state (a probability distribution over the
states) after executing 'action' and perceiving 'observation'. The prediction
step propagates the belief through the transition model and the update step
weights it by the sensor model, then renormalizes::
b'(s') = alpha * P(o | s') * sum_s P(s' | s, a) b(s)
"""
transition, sensor = pomdp.t_prob[int(action)], pomdp.e_prob[int(action)]
n = len(belief)
# prediction: b_pred(s') = sum_s b(s) P(s' | s, a)
predicted = [sum(belief[s] * transition[s][sp] for s in range(n)) for sp in range(n)]
# update: weight each predicted state by the likelihood of the observation
updated = [sensor[sp][observation] * predicted[sp] for sp in range(n)]
total = sum(updated)
return [b / total for b in updated] if total else predicted
[docs]
def pomdp_lookahead(pomdp, belief, depth):
"""
[Section 17.5 - Dynamic Decision Networks]
Online decision making for a POMDP modeled as a dynamic decision network
(DDN): the network is projected 'depth' steps into the future and solved by
belief-state expectimax search. Decision nodes range over the belief states
and chance nodes branch over the possible observations, with the belief
updated by POMDP filtering ([Equation 17.17]) along each branch. Returns the
action that maximizes the expected discounted utility from 'belief'.
"""
def utility(belief, depth):
if depth == 0:
return 0
return max(q_value(belief, action, depth) for action in pomdp.actions)
def q_value(belief, action, depth):
# expected immediate reward of taking 'action' in the current belief
reward = sum(belief[s] * pomdp.rewards[int(action)][s] for s in range(len(belief)))
# expectation over the possible observations of the next belief's utility
sensor = pomdp.e_prob[int(action)]
predicted = [sum(belief[s] * pomdp.t_prob[int(action)][s][sp] for s in range(len(belief)))
for sp in range(len(belief))]
future = 0
for observation in range(len(sensor[0])):
# P(observation | belief, action)
p_o = sum(predicted[sp] * sensor[sp][observation] for sp in range(len(belief)))
if p_o > 0:
future += p_o * utility(update_belief(pomdp, belief, action, observation), depth - 1)
return reward + pomdp.gamma * future
return max(pomdp.actions, key=lambda action: q_value(belief, action, depth))
__doc__ += """
>>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01))
>>> sequential_decision_environment.to_arrows(pi)
[['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']]
>>> from utils import print_table
>>> print_table(sequential_decision_environment.to_arrows(pi))
> > > .
^ None ^ .
^ > ^ <
>>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment)))
> > > .
^ None ^ .
^ > ^ <
""" # noqa
"""
s = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')],
'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')],
'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')],
},
'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')],
'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')],
'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')],
},
'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')],
'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')],
'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')],
},
}
"""