Source code for mdp4e

"""
Markov Decision Processes (Chapter 16)

First we define an MDP, and the special case of a GridMDP, in which
states are laid out in a 2-dimensional grid. We also represent a policy
as a dictionary of {state: action} pairs, and a Utility function as a
dictionary of {state: number} pairs. We then define the value_iteration
and policy_iteration algorithms.
"""

import random
from collections import defaultdict

import numpy as np

from utils4e import vector_add, orientations, turn_right, turn_left


[docs] class MDP: """A Markov Decision Process, defined by an initial state, transition model, and reward function. We also keep track of a gamma value, for use by algorithms. The transition model is represented somewhat differently from the text. Instead of P(s' | s, a) being a probability number for each state/state/action triplet, we instead have T(s, a) return a list of (p, s') pairs. We also keep track of the possible states, terminal states, and actions for each state. [Page 646]""" def __init__(self, init, actlist, terminals, transitions=None, reward=None, states=None, gamma=0.9): if not (0 < gamma <= 1): raise ValueError("An MDP must have 0 < gamma <= 1") # collect states from transitions table if not passed. self.states = states or self.get_states_from_transitions(transitions) self.init = init if isinstance(actlist, list): # if actlist is a list, all states have the same actions self.actlist = actlist elif isinstance(actlist, dict): # if actlist is a dict, different actions for each state self.actlist = actlist elif isinstance(actlist, set): # if actlist is a set, convert it to a list so actions are indexable self.actlist = list(actlist) self.terminals = terminals self.transitions = transitions or {} if not self.transitions: print("Warning: Transition table is empty.") self.gamma = gamma self.reward = reward or {s: 0 for s in self.states} # self.check_consistency()
[docs] def R(self, state): """Return a numeric reward for this state.""" return self.reward[state]
[docs] def T(self, state, action): """Transition model. From a state and an action, return a list of (probability, result-state) pairs.""" if not self.transitions: raise ValueError("Transition model is missing") else: return self.transitions[state][action]
[docs] def actions(self, state): """Return a list of actions that can be performed in this state. By default, a fixed list of actions, except for terminal states. Override this method if you need to specialize by state.""" if state in self.terminals: return [None] else: return self.actlist
[docs] def get_states_from_transitions(self, transitions): """Return the set of all states mentioned in the transition model, both as source states and as reachable next states (or None if transitions is not a dict).""" if isinstance(transitions, dict): s1 = set(transitions.keys()) s2 = set(tr[1] for actions in transitions.values() for effects in actions.values() for tr in effects) return s1.union(s2) else: print('Could not retrieve states from transitions') return None
[docs] def check_consistency(self): """Assert that the MDP is well formed: the states match those in the transitions, the initial and terminal states are valid, rewards are defined for every state, and each action's outcome probabilities sum to 1.""" # check that all states in transitions are valid assert set(self.states) == self.get_states_from_transitions(self.transitions) # check that init is a valid state assert self.init in self.states # check reward for each state assert set(self.reward.keys()) == set(self.states) # check that all terminals are valid states assert all(t in self.states for t in self.terminals) # check that probability distributions for all actions sum to 1 for s1, actions in self.transitions.items(): for a in actions.keys(): s = 0 for o in actions[a]: s += o[0] assert abs(s - 1) < 0.001
[docs] class MDP2(MDP): """ Inherits from MDP. Handles terminal states, and transitions to and from terminal states better. """ def __init__(self, init, actlist, terminals, transitions, reward=None, gamma=0.9): MDP.__init__(self, init, actlist, terminals, transitions, reward, gamma=gamma)
[docs] def T(self, state, action): """Return a list of (probability, next-state) pairs for taking the action in the given state; a None action stays in place with probability 0.0.""" if action is None: return [(0.0, state)] else: return self.transitions[state][action]
[docs] class GridMDP(MDP): """A two-dimensional grid MDP, as in [Figure 16.1]. All you have to do is specify the grid as a list of lists of rewards; use None for an obstacle (unreachable state). Also, you should specify the terminal states. An action is an (x, y) unit vector; e.g. (1, 0) means move east.""" def __init__(self, grid, terminals, init=(0, 0), gamma=.9): grid.reverse() # because we want row 0 on bottom, not on top reward = {} states = set() self.rows = len(grid) self.cols = len(grid[0]) self.grid = grid for x in range(self.cols): for y in range(self.rows): if grid[y][x]: states.add((x, y)) reward[(x, y)] = grid[y][x] self.states = states actlist = orientations transitions = {} for s in states: transitions[s] = {} for a in actlist: transitions[s][a] = self.calculate_T(s, a) MDP.__init__(self, init, actlist=actlist, terminals=terminals, transitions=transitions, reward=reward, states=states, gamma=gamma)
[docs] def calculate_T(self, state, action): """Return the (probability, next-state) outcomes for the intended action: 0.8 of moving as intended and 0.1 each of veering to the right or left.""" if action: return [(0.8, self.go(state, action)), (0.1, self.go(state, turn_right(action))), (0.1, self.go(state, turn_left(action)))] else: return [(0.0, state)]
[docs] def T(self, state, action): """Return the list of (probability, next-state) pairs for the action in the given state; a falsy action stays in place with probability 0.0.""" return self.transitions[state][action] if action else [(0.0, state)]
[docs] def go(self, state, direction): """Return the state that results from going in this direction.""" state1 = tuple(vector_add(state, direction)) return state1 if state1 in self.states else state
[docs] def to_grid(self, mapping): """Convert a mapping from (x, y) to v into a [[..., v, ...]] grid.""" return list(reversed([[mapping.get((x, y), None) for x in range(self.cols)] for y in range(self.rows)]))
[docs] def to_arrows(self, policy): """Render a policy as a grid of arrow characters (``>``, ``^``, ``<``, ``v``, with ``.`` for a None action) laid out to match the grid.""" chars = {(1, 0): '>', (0, 1): '^', (-1, 0): '<', (0, -1): 'v', None: '.'} return self.to_grid({s: chars[a] for (s, a) in policy.items()})
# ______________________________________________________________________________ """ [Figure 16.1] A 4x3 grid environment that presents the agent with a sequential decision problem. """ sequential_decision_environment = GridMDP([[-0.04, -0.04, -0.04, +1], [-0.04, None, -0.04, -1], [-0.04, -0.04, -0.04, -0.04]], terminals=[(3, 2), (3, 1)]) # ______________________________________________________________________________ # 16.1.3 The Bellman equation for utilities
[docs] def q_value(mdp, s, a, U): """Return the Q-value of taking action ``a`` in state ``s`` given the utility estimates ``U``, i.e. the expected sum of the immediate reward and the discounted utility of the successor states. With no action it falls back to the reward of ``s``.""" if not a: return mdp.R(s) res = 0 for p, s_prime in mdp.T(s, a): res += p * (mdp.R(s) + mdp.gamma * U[s_prime]) return res
# Dynamic decision networks (DDNs) are solved online by the belief-state # look-ahead agent `pomdp_lookahead` defined below, with belief updates via # `update_belief` (POMDP filtering). # ______________________________________________________________________________ # 16.2 Algorithms for MDPs # 16.2.1 Value Iteration
[docs] def value_iteration(mdp, epsilon=0.001): """Solving an MDP by value iteration. [Figure 16.6]""" U1 = {s: 0 for s in mdp.states} R, T, gamma = mdp.R, mdp.T, mdp.gamma while True: U = U1.copy() delta = 0 for s in mdp.states: # U1[s] = R(s) + gamma * max(sum(p * U[s1] for (p, s1) in T(s, a)) # for a in mdp.actions(s)) U1[s] = max(q_value(mdp, s, a, U) for a in mdp.actions(s)) delta = max(delta, abs(U1[s] - U[s])) if delta <= epsilon * (1 - gamma) / gamma: return U
# ______________________________________________________________________________ # 16.2.2 Policy Iteration
[docs] def best_policy(mdp, U): """Given an MDP and a utility function U, determine the best policy, as a mapping from state to action.""" pi = {} for s in mdp.states: pi[s] = max(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U)) return pi
[docs] def expected_utility(a, s, U, mdp): """The expected utility of doing a in state s, according to the MDP and U.""" return sum(p * U[s1] for (p, s1) in mdp.T(s, a))
[docs] def policy_iteration(mdp): """Solve an MDP by policy iteration [Figure 17.7]""" U = {s: 0 for s in mdp.states} pi = {s: random.choice(mdp.actions(s)) for s in mdp.states} while True: U = policy_evaluation(pi, U, mdp) unchanged = True for s in mdp.states: a_star = max(mdp.actions(s), key=lambda a: q_value(mdp, s, a, U)) # a = max(mdp.actions(s), key=lambda a: expected_utility(a, s, U, mdp)) if q_value(mdp, s, a_star, U) > q_value(mdp, s, pi[s], U): pi[s] = a_star unchanged = False if unchanged: return pi
[docs] def policy_evaluation(pi, U, mdp, k=20): """Return an updated utility mapping U from each state in the MDP to its utility, using an approximation (modified policy iteration).""" R, T, gamma = mdp.R, mdp.T, mdp.gamma for i in range(k): for s in mdp.states: U[s] = R(s) + gamma * sum(p * U[s1] for (p, s1) in T(s, pi[s])) return U
# ___________________________________________________________________ # 16.4 Partially Observed MDPs
[docs] class POMDP(MDP): r"""A Partially Observable Markov Decision Process, defined by a transition model P(s'\|s,a), actions A(s), a reward function R(s), and a sensor model P(e\|s). We also keep track of a gamma value, for use by algorithms. The transition and the sensor models are defined as matrices. We also keep track of the possible states and actions for each state. [Page 659].""" def __init__(self, actions, transitions=None, evidences=None, rewards=None, states=None, gamma=0.95): """Initialize variables of the pomdp""" if not (0 < gamma <= 1): raise ValueError('A POMDP must have 0 < gamma <= 1') self.states = states self.actions = actions # transition model cannot be undefined self.t_prob = transitions or {} if not self.t_prob: print('Warning: Transition model is undefined') # sensor model cannot be undefined self.e_prob = evidences or {} if not self.e_prob: print('Warning: Sensor model is undefined') self.gamma = gamma self.rewards = rewards
[docs] def remove_dominated_plans(self, input_values): """ Remove dominated plans. This method finds all the lines contributing to the upper surface and removes those which don't. """ values = [val for action in input_values for val in input_values[action]] values.sort(key=lambda x: x[0], reverse=True) best = [values[0]] y1_max = max(val[1] for val in values) tgt = values[0] prev_b = 0 prev_ix = 0 while tgt[1] != y1_max: min_b = 1 min_ix = 0 for i in range(prev_ix + 1, len(values)): if values[i][0] - tgt[0] + tgt[1] - values[i][1] != 0: trans_b = (values[i][0] - tgt[0]) / (values[i][0] - tgt[0] + tgt[1] - values[i][1]) if 0 <= trans_b <= 1 and trans_b > prev_b and trans_b < min_b: min_b = trans_b min_ix = i prev_b = min_b prev_ix = min_ix tgt = values[min_ix] best.append(tgt) return self.generate_mapping(best, input_values)
[docs] def remove_dominated_plans_fast(self, input_values): """ Remove dominated plans using approximations. Resamples the upper boundary at intervals of 100 and finds the maximum values at these points. """ values = [val for action in input_values for val in input_values[action]] values.sort(key=lambda x: x[0], reverse=True) best = [] sr = 100 for i in range(sr + 1): x = i / float(sr) maximum = (values[0][1] - values[0][0]) * x + values[0][0] tgt = values[0] for value in values: val = (value[1] - value[0]) * x + value[0] if val > maximum: maximum = val tgt = value if all(any(tgt != v) for v in best): best.append(np.array(tgt)) return self.generate_mapping(best, input_values)
[docs] def generate_mapping(self, best, input_values): """Generate mappings after removing dominated plans""" mapping = defaultdict(list) for value in best: for action in input_values: if any(all(value == v) for v in input_values[action]): mapping[action].append(value) return mapping
[docs] def max_difference(self, U1, U2): """Find maximum difference between two utility mappings""" for k, v in U1.items(): sum1 = 0 for element in U1[k]: sum1 += sum(element) sum2 = 0 for element in U2[k]: sum2 += sum(element) return abs(sum1 - sum2)
[docs] class Matrix: """Matrix operations class"""
[docs] @staticmethod def add(A, B): """Add two matrices A and B""" res = [] for i in range(len(A)): row = [] for j in range(len(A[0])): row.append(A[i][j] + B[i][j]) res.append(row) return res
[docs] @staticmethod def scalar_multiply(a, B): """Multiply scalar a to matrix B""" for i in range(len(B)): for j in range(len(B[0])): B[i][j] = a * B[i][j] return B
[docs] @staticmethod def multiply(A, B): """Multiply two matrices A and B element-wise""" matrix = [] for i in range(len(A)): row = [] for j in range(len(A[0])): row.append(A[i][j] * B[i][j]) matrix.append(row) return matrix
[docs] @staticmethod def matmul(A, B): """Inner-product of two matrices""" return [[sum(ele_a * ele_b for ele_a, ele_b in zip(row_a, col_b)) for col_b in list(zip(*B))] for row_a in A]
[docs] @staticmethod def transpose(A): """Transpose a matrix""" return [list(i) for i in zip(*A)]
[docs] def pomdp_value_iteration(pomdp, epsilon=0.1): """Solving a POMDP by value iteration.""" U = {'': [[0] * len(pomdp.states)]} count = 0 while True: count += 1 prev_U = U values = [val for action in U for val in U[action]] value_matxs = [] for i in values: for j in values: value_matxs.append([i, j]) U1 = defaultdict(list) for action in pomdp.actions: for u in value_matxs: u1 = Matrix.matmul(Matrix.matmul(pomdp.t_prob[int(action)], Matrix.multiply(pomdp.e_prob[int(action)], Matrix.transpose(u))), [[1], [1]]) u1 = Matrix.add(Matrix.scalar_multiply(pomdp.gamma, Matrix.transpose(u1)), [pomdp.rewards[int(action)]]) U1[action].append(u1[0]) U = pomdp.remove_dominated_plans_fast(U1) # replace with U = pomdp.remove_dominated_plans(U1) for accurate calculations if count > 10: if pomdp.max_difference(U, prev_U) < epsilon * (1 - pomdp.gamma) / pomdp.gamma: return U
[docs] def update_belief(pomdp, belief, action, observation): """ [Equation 17.17] POMDP filtering: update the belief state (a probability distribution over the states) after executing 'action' and perceiving 'observation'. The prediction step propagates the belief through the transition model and the update step weights it by the sensor model, then renormalizes:: b'(s') = alpha * P(o | s') * sum_s P(s' | s, a) b(s) """ transition, sensor = pomdp.t_prob[int(action)], pomdp.e_prob[int(action)] n = len(belief) # prediction: b_pred(s') = sum_s b(s) P(s' | s, a) predicted = [sum(belief[s] * transition[s][sp] for s in range(n)) for sp in range(n)] # update: weight each predicted state by the likelihood of the observation updated = [sensor[sp][observation] * predicted[sp] for sp in range(n)] total = sum(updated) return [b / total for b in updated] if total else predicted
[docs] def pomdp_lookahead(pomdp, belief, depth): """ [Section 17.5 - Dynamic Decision Networks] Online decision making for a POMDP modeled as a dynamic decision network (DDN): the network is projected 'depth' steps into the future and solved by belief-state expectimax search. Decision nodes range over the belief states and chance nodes branch over the possible observations, with the belief updated by POMDP filtering ([Equation 17.17]) along each branch. Returns the action that maximizes the expected discounted utility from 'belief'. """ def utility(belief, depth): if depth == 0: return 0 return max(q_value(belief, action, depth) for action in pomdp.actions) def q_value(belief, action, depth): # expected immediate reward of taking 'action' in the current belief reward = sum(belief[s] * pomdp.rewards[int(action)][s] for s in range(len(belief))) # expectation over the possible observations of the next belief's utility sensor = pomdp.e_prob[int(action)] predicted = [sum(belief[s] * pomdp.t_prob[int(action)][s][sp] for s in range(len(belief))) for sp in range(len(belief))] future = 0 for observation in range(len(sensor[0])): # P(observation | belief, action) p_o = sum(predicted[sp] * sensor[sp][observation] for sp in range(len(belief))) if p_o > 0: future += p_o * utility(update_belief(pomdp, belief, action, observation), depth - 1) return reward + pomdp.gamma * future return max(pomdp.actions, key=lambda action: q_value(belief, action, depth))
__doc__ += """ >>> pi = best_policy(sequential_decision_environment, value_iteration(sequential_decision_environment, .01)) >>> sequential_decision_environment.to_arrows(pi) [['>', '>', '>', '.'], ['^', None, '^', '.'], ['^', '>', '^', '<']] >>> from utils import print_table >>> print_table(sequential_decision_environment.to_arrows(pi)) > > > . ^ None ^ . ^ > ^ < >>> print_table(sequential_decision_environment.to_arrows(policy_iteration(sequential_decision_environment))) > > > . ^ None ^ . ^ > ^ < """ # noqa """ s = { 'a' : { 'plan1' : [(0.2, 'a'), (0.3, 'b'), (0.3, 'c'), (0.2, 'd')], 'plan2' : [(0.4, 'a'), (0.15, 'b'), (0.45, 'c')], 'plan3' : [(0.2, 'a'), (0.5, 'b'), (0.3, 'c')], }, 'b' : { 'plan1' : [(0.2, 'a'), (0.6, 'b'), (0.2, 'c'), (0.1, 'd')], 'plan2' : [(0.6, 'a'), (0.2, 'b'), (0.1, 'c'), (0.1, 'd')], 'plan3' : [(0.3, 'a'), (0.3, 'b'), (0.4, 'c')], }, 'c' : { 'plan1' : [(0.3, 'a'), (0.5, 'b'), (0.1, 'c'), (0.1, 'd')], 'plan2' : [(0.5, 'a'), (0.3, 'b'), (0.1, 'c'), (0.1, 'd')], 'plan3' : [(0.1, 'a'), (0.3, 'b'), (0.1, 'c'), (0.5, 'd')], }, } """