Source code for search

"""
Search (Chapters 3-4)

The way to use this code is to subclass Problem to create a class of problems,
then create problem instances and solve them with calls to the various search
functions.
"""

import sys
from collections import deque

from utils import *


[docs] class Problem: """The abstract class for a formal problem. You should subclass this and implement the methods actions and result, and possibly __init__, goal_test, and path_cost. Then you will create instances of your subclass and solve them with the various search functions.""" def __init__(self, initial, goal=None): """The constructor specifies the initial state, and possibly a goal state, if there is a unique goal. Your subclass's constructor can add other arguments.""" self.initial = initial self.goal = goal
[docs] def actions(self, state): """Return the actions that can be executed in the given state. The result would typically be a list, but if there are many actions, consider yielding them one at a time in an iterator, rather than building them all at once.""" raise NotImplementedError
[docs] def result(self, state, action): """Return the state that results from executing the given action in the given state. The action must be one of self.actions(state).""" raise NotImplementedError
[docs] def goal_test(self, state): """Return True if the state is a goal. The default method compares the state to self.goal or checks for state in self.goal if it is a list, as specified in the constructor. Override this method if checking against a single self.goal is not enough.""" if isinstance(self.goal, list): return is_in(state, self.goal) else: return state == self.goal
[docs] def path_cost(self, c, state1, action, state2): """Return the cost of a solution path that arrives at state2 from state1 via action, assuming cost c to get up to state1. If the problem is such that the path doesn't matter, this function will only look at state2. If the path does matter, it will consider c and maybe state1 and action. The default method costs 1 for every step in the path.""" return c + 1
[docs] def value(self, state): """For optimization problems, each state has a value. Hill Climbing and related algorithms try to maximize this value.""" raise NotImplementedError
# ______________________________________________________________________________
[docs] class Node: """A node in a search tree. Contains a pointer to the parent (the node that this is a successor of) and to the actual state for this node. Note that if a state is arrived at by two paths, then there are two nodes with the same state. Also includes the action that got us to this state, and the total path_cost (also known as g) to reach the node. Other functions may add an f and h value; see best_first_graph_search and astar_search for an explanation of how the f and h values are handled. You will not need to subclass this class.""" def __init__(self, state, parent=None, action=None, path_cost=0): """Create a search tree Node, derived from a parent by an action.""" self.state = state self.parent = parent self.action = action self.path_cost = path_cost self.depth = 0 if parent: self.depth = parent.depth + 1 def __repr__(self): return "<Node {}>".format(self.state) def __lt__(self, node): return self.state < node.state
[docs] def expand(self, problem): """List the nodes reachable in one step from this node.""" return [self.child_node(problem, action) for action in problem.actions(self.state)]
[docs] def child_node(self, problem, action): """[Figure 3.10]""" next_state = problem.result(self.state, action) next_node = Node(next_state, self, action, problem.path_cost(self.path_cost, self.state, action, next_state)) return next_node
[docs] def solution(self): """Return the sequence of actions to go from the root to this node.""" return [node.action for node in self.path()[1:]]
[docs] def path(self): """Return a list of nodes forming the path from the root to this node.""" node, path_back = self, [] while node: path_back.append(node) node = node.parent return list(reversed(path_back))
# We want for a queue of nodes in breadth_first_graph_search or # astar_search to have no duplicated states, so we treat nodes # with the same state as equal. [Problem: this may not be what you # want in other contexts.] def __eq__(self, other): return isinstance(other, Node) and self.state == other.state def __hash__(self): # We use the hash value of the state # stored in the node instead of the node # object itself to quickly search a node # with the same state in a Hash Table return hash(self.state)
# ______________________________________________________________________________
[docs] class SimpleProblemSolvingAgentProgram: """ [Figure 3.1] Abstract framework for a problem-solving agent. """ def __init__(self, initial_state=None): """State is an abstract representation of the state of the world, and seq is the list of actions required to get to a particular state from the initial state(root).""" self.state = initial_state self.seq = [] def __call__(self, percept): """[Figure 3.1] Formulate a goal and problem, then search for a sequence of actions to solve it.""" self.state = self.update_state(self.state, percept) if not self.seq: goal = self.formulate_goal(self.state) problem = self.formulate_problem(self.state, goal) self.seq = self.search(problem) if not self.seq: return None return self.seq.pop(0)
[docs] def update_state(self, state, percept): """Update the agent's world state from the latest percept (abstract).""" raise NotImplementedError
[docs] def formulate_goal(self, state): """Decide on a goal to pursue given the current state (abstract).""" raise NotImplementedError
[docs] def formulate_problem(self, state, goal): """Build a Problem instance from the current state and chosen goal (abstract).""" raise NotImplementedError
[docs] def search(self, problem): """Return a sequence of actions that solves the given problem (abstract).""" raise NotImplementedError
# ______________________________________________________________________________ # Uninformed Search algorithms # ______________________________________________________________________________ # Bidirectional Search # Pseudocode from https://webdocs.cs.ualberta.ca/%7Eholte/Publications/MM-AAAI2016.pdf # ______________________________________________________________________________ # Informed (Heuristic) Search greedy_best_first_graph_search = best_first_graph_search greedy_best_first_tree_search = best_first_tree_search # Greedy best-first search is accomplished by specifying f(n) = h(n). # ______________________________________________________________________________ # A* heuristics
[docs] class EightPuzzle(Problem): """ The problem of sliding tiles numbered from 1 to 8 on a 3x3 board, where one of the squares is a blank. A state is represented as a tuple of length 9, where element at index i represents the tile number at index i (0 if it's an empty square) """ def __init__(self, initial, goal=(1, 2, 3, 4, 5, 6, 7, 8, 0)): """ Define goal state and initialize a problem """ super().__init__(initial, goal)
[docs] def find_blank_square(self, state): """Return the index of the blank square in a given state""" return state.index(0)
[docs] def actions(self, state): """ Return the actions that can be executed in the given state. The result would be a list, since there are only four possible actions in any given state of the environment """ possible_actions = ['UP', 'DOWN', 'LEFT', 'RIGHT'] index_blank_square = self.find_blank_square(state) if index_blank_square % 3 == 0: possible_actions.remove('LEFT') if index_blank_square < 3: possible_actions.remove('UP') if index_blank_square % 3 == 2: possible_actions.remove('RIGHT') if index_blank_square > 5: possible_actions.remove('DOWN') return possible_actions
[docs] def result(self, state, action): """ Given state and action, return a new state that is the result of the action. Action is assumed to be a valid action in the state """ # blank is the index of the blank square blank = self.find_blank_square(state) new_state = list(state) delta = {'UP': -3, 'DOWN': 3, 'LEFT': -1, 'RIGHT': 1} neighbor = blank + delta[action] new_state[blank], new_state[neighbor] = new_state[neighbor], new_state[blank] return tuple(new_state)
[docs] def goal_test(self, state): """ Given a state, return True if state is a goal state or False, otherwise """ return state == self.goal
[docs] def check_solvability(self, state): """ Checks if the given state is solvable """ inversion = 0 for i in range(len(state)): for j in range(i + 1, len(state)): if (state[i] > state[j]) and state[i] != 0 and state[j] != 0: inversion += 1 return inversion % 2 == 0
[docs] def h(self, node): """ Return the heuristic value for a given state. Default heuristic function used is h(n) = number of misplaced tiles """ return sum(s != g for (s, g) in zip(node.state, self.goal))
# ______________________________________________________________________________
[docs] class NPuzzle(Problem): """Generalization of the Eight Puzzle problem. The problem consists of sliding tiles numbered from 1 to N ^2 on a NxN board, where one of the squares is a blank. The state is represented as a tuple of length N^2, where element at index i represents the tile number at index i , where '0' denotes empty square index""" def __init__(self, initial=None, goal=None, size=3, heuristic='hamming', shuffle=10): """Args: intial: intiail state of puzzle. If not passed will default to goal state size: number of rows and columns in puzzle heuristic: heuristic used in astat search shuffle: number of times to perfrom random action from the initial state. Returns: Problem instance modeling any size of sliding puzzle""" self.size = size self.heuristic = heuristic goal = goal or (tuple(range(1,size*size, 1)) + (0, )) initial = initial or (tuple(range(1,size*size, 1)) + (0, )) if shuffle: initial = self.shuffle_state(shuffle, list(initial)) assert len(goal) == size*size, "length of goal tuple should be {length}".format(length=size*size) assert len(initial) == size * size, "length of initial tuple should be {length}".format(length=size * size) assert self.check_solvability(initial), "The puzzle is not solvable from the given initial state!" Problem.__init__(self, initial, goal)
[docs] def shuffle_state(self, shuffle, state): """Perform random action from the initial state""" for _ in range(shuffle): action = random.sample(self.actions(state),1)[0] state = self.result(state,action) return state
[docs] def find_blank_square(self, state): """Return the index of the blank square in a given state""" return state.index(0)
[docs] def actions(self, state): """Return the actions that can be executed in the given state. The result would be a list of maximally four directions, since there are only four possible actions in any given state of the environment""" possible_actions = ['UP', 'DOWN', 'LEFT', 'RIGHT'] index_blank_square = self.find_blank_square(state) if index_blank_square % self.size == 0: possible_actions.remove('LEFT') if index_blank_square < self.size: possible_actions.remove('UP') if index_blank_square % self.size == self.size -1 : possible_actions.remove('RIGHT') if index_blank_square > self.size*self.size - self.size - 1: possible_actions.remove('DOWN') return possible_actions
[docs] def result(self, state, action): """Given state and action, return a new state that is the result of the action. Action is assumed to be a valid action in the state""" index_blank_square = self.find_blank_square(state) new_state = list(state) delta = {'UP': -self.size, 'DOWN': self.size, 'LEFT': -1, 'RIGHT': 1} neighbor = index_blank_square + delta[action] new_state[index_blank_square], new_state[neighbor] = new_state[neighbor], new_state[index_blank_square] return tuple(new_state)
[docs] def goal_test(self, state): """Given a state, return True if state is a goal state or False otherwise""" return state == self.goal
[docs] def check_solvability(self, state): """Checks if the given state is solvable. Whether or not puzzle is solvable depends on size being even or odd. If N is odd, then puzzle instance is solvable if number of inversions is even in the input state. If N is even, puzzle instance is solvable if the blank is on an even row counting from the bottom (second-last, fourth-last, etc.) and number of inversions is odd or the blank is on an odd row counting from the bottom (last, third-last, fifth-last, etc.) and number of inversions is even." reference : https://www.geeksforgeeks.org/check-instance-15-puzzle-solvable/""" solvable = True inversions = 0 for i in range(len(state)-1): for j in range(i+1, len(state)): if state[i] != 0 and state[j] != 0 and state[i] > state[j]: inversions += 1 if self.size % 2 != 0: solvable = True if (inversions % 2 == 0 ) else False elif self.find_blank_square(state) // self.size % 2 == 0 and inversions % 2 != 0 : solvable = True elif self.find_blank_square(state) // self.size % 2 != 0 and inversions % 2 == 0: solvable = True else: solvable = False return solvable
[docs] def h(self, node): """Return the heuristic value for a given state. Default heuristic function used is h(n) = number of misplaced tiles""" heuristics = {'hamming':self.hamming_distance_heuristic(node)} return heuristics.get(self.heuristic)
[docs] def hamming_distance_heuristic(self, node): """Number of tiles that are not in their goal position (Hamming distance).""" return hamming_distance(node.state, self.goal)
[docs] class TravelingSalesman(Problem): """ The problem of finding the shortest Hamiltonian cycle (tour) that visits every city exactly once and returns to the start.""" def __init__(self, all_cities, initial, goal=None): # initial = (0), goal = some state that its first and last values are 0 """ Define goal state and initialize a problem """ super().__init__(initial, goal) self.cities_matrix = self.init_distance_matrix(all_cities) self.cities_identifiers = set(all_cities.keys()) self.cities_amount = len(self.cities_identifiers) self.trees_evaluation_hash = {}
[docs] def value(self, state): """ Define state value """ overall_distance = 0.0 state_max_ind = len(state) - 1 for ind, city in enumerate(state): if ind != state_max_ind: overall_distance += self.cities_matrix[state[ind]][state[ind + 1]] return overall_distance
[docs] def actions(self, state): """ Return the actions that can be executed in the given state. The result would be a list""" if len(state) == self.cities_amount: return [0] return self.get_unvisited_cities(state)
[docs] def result(self, state, action): """ Given state and action, return a new state that is the result of the action. Action is assumed to be a valid action in the state """ # blank is the index of the blank square new_state = list(state) new_state.append(action) return tuple(new_state)
[docs] def goal_test(self, state): """ Given a state, return True if state is a goal state or False, otherwise""" return len(state) == (self.cities_amount + 1) and state[0] == state[self.cities_amount]
[docs] def get_unvisited_cities(self, state): """ Returns unused actions(cities) """ return self.cities_identifiers.difference(state)
[docs] def path_cost(self, c, state1, action, state2): """ Return next state cost """ return c + self.cities_matrix[state1[-1]][state2[-1]]
[docs] def h(self, node): """Admissible heuristic: total edge weight of the MST over the cities that have not yet been visited.""" h = 0.0 unvisited_cities = self.get_unvisited_cities(node.state) mst_evaluation = self.eval_unvisited_mst_edges_sum(unvisited_cities) h += mst_evaluation return h
[docs] def eval_unvisited_mst_edges_sum(self, unvisited_nodes): """Return the total MST edge weight over the unvisited cities (0.0 if none).""" if not unvisited_nodes: return 0.0 return self.tsp_kruskal(unvisited_nodes)
[docs] def tsp_kruskal(self, unvisited_nodes): """ Generate MST of the graph and return sum of edges """ forest_value = 0.0 disjoint_sets = DisjointSets() non_sorted_edges_list = [] for vertex_x in unvisited_nodes: disjoint_sets.make_set(vertex_x) for vertex_y in unvisited_nodes: if vertex_x < vertex_y: non_sorted_edges_list.append((vertex_x, vertex_y)) edges_list = sorted(non_sorted_edges_list, key=lambda edge: self.cities_matrix[edge[0]][edge[1]], reverse=False) for vertex_u, vertex_v in edges_list: if disjoint_sets.find(vertex_v) != disjoint_sets.find(vertex_u): forest_value += self.cities_matrix[vertex_u][vertex_v] disjoint_sets.union(vertex_v, vertex_u) return forest_value
[docs] def init_distance_matrix(self, all_cities): """ Generate matrix of distance between all cities """ return {row_ind: {col_ind: distance(row_coords, col_coords) for col_ind, col_coords in all_cities.items()} for row_ind, row_coords in all_cities.items()}
[docs] class DisjointSets: """Union-find (disjoint-set) structure with union by rank, used to build the MST for the TravelingSalesman heuristic.""" def __init__(self): self.parent = {} self.rank = {}
[docs] def make_set(self, vertex): """Create a new singleton set whose only element is vertex.""" self.parent[vertex] = vertex self.rank[vertex] = 0
[docs] def find(self, vertex): """Return the representative (root) of the set that vertex belongs to.""" if self.parent[vertex] == vertex: return vertex return self.find(self.parent[vertex])
[docs] def union(self, vertex_u, vertex_v): """Merge the sets containing vertex_u and vertex_v, attaching the lower-rank tree under the higher-rank one (union by rank).""" vertex_u_root = self.find(vertex_u) vertex_v_root = self.find(vertex_v) if self.rank[vertex_u_root] > self.rank[vertex_v_root]: self.parent[vertex_v_root] = vertex_u_root elif self.rank[vertex_u_root] < self.rank[vertex_v_root]: self.parent[vertex_u_root] = vertex_v_root else: self.parent[vertex_u_root] = vertex_v_root self.rank[vertex_v_root] += 1
# ______________________________________________________________________________
[docs] class PlanRoute(Problem): """ The problem of moving the Hybrid Wumpus Agent from one place to other """ def __init__(self, initial, goal, allowed, dimrow): """ Define goal state and initialize a problem """ super().__init__(initial, goal) self.dimrow = dimrow self.goal = goal self.allowed = allowed
[docs] def actions(self, state): """ Return the actions that can be executed in the given state. The result would be a list, since there are only three possible actions in any given state of the environment """ possible_actions = ['Forward', 'TurnLeft', 'TurnRight'] x, y = state.get_location() orientation = state.get_orientation() # Prevent Bumps if x == 1 and orientation == 'LEFT': if 'Forward' in possible_actions: possible_actions.remove('Forward') if y == 1 and orientation == 'DOWN': if 'Forward' in possible_actions: possible_actions.remove('Forward') if x == self.dimrow and orientation == 'RIGHT': if 'Forward' in possible_actions: possible_actions.remove('Forward') if y == self.dimrow and orientation == 'UP': if 'Forward' in possible_actions: possible_actions.remove('Forward') return possible_actions
[docs] def result(self, state, action): """ Given state and action, return a new state that is the result of the action. Action is assumed to be a valid action in the state. A fresh state object is returned; the input state is never mutated, so the search tree stays consistent. """ x, y = state.get_location() orientation = state.get_orientation() proposed_loc = None new_orientation = orientation # Move Forward if action == 'Forward': if orientation == 'UP': proposed_loc = [x, y + 1] elif orientation == 'DOWN': proposed_loc = [x, y - 1] elif orientation == 'LEFT': proposed_loc = [x - 1, y] elif orientation == 'RIGHT': proposed_loc = [x + 1, y] else: raise Exception('InvalidOrientation') # Rotate counter-clockwise elif action == 'TurnLeft': new_orientation = {'UP': 'LEFT', 'DOWN': 'RIGHT', 'LEFT': 'DOWN', 'RIGHT': 'UP'}[orientation] # Rotate clockwise elif action == 'TurnRight': new_orientation = {'UP': 'RIGHT', 'DOWN': 'LEFT', 'LEFT': 'UP', 'RIGHT': 'DOWN'}[orientation] new_x, new_y = x, y if proposed_loc is not None and proposed_loc in self.allowed: new_x, new_y = proposed_loc[0], proposed_loc[1] return type(state)(new_x, new_y, new_orientation)
[docs] def goal_test(self, state): """ Return True when the state matches any goal position. The goal is a collection of [x, y] cells or of position objects; for the latter the orientation must match too (e.g. a shooting position). """ for goal in self.goal: if hasattr(goal, 'get_location'): if (state.get_location() == goal.get_location() and state.get_orientation() == goal.get_orientation()): return True elif list(state.get_location()) == list(goal): return True return False
[docs] def h(self, node): """ Manhattan distance from the node's location to the nearest goal cell. """ x1, y1 = node.state.get_location() distances = [] for goal in self.goal: x2, y2 = goal.get_location() if hasattr(goal, 'get_location') else (goal[0], goal[1]) distances.append(abs(x2 - x1) + abs(y2 - y1)) return min(distances) if distances else 0
# ______________________________________________________________________________ # Other search algorithms
[docs] def hill_climbing(problem): """ [Figure 4.2] From the initial node, keep choosing the neighbor with highest value, stopping when no neighbor is better. """ current = Node(problem.initial) while True: neighbors = current.expand(problem) if not neighbors: break neighbor = argmax_random_tie(neighbors, key=lambda node: problem.value(node.state)) if problem.value(neighbor.state) <= problem.value(current.state): break current = neighbor return current.state
[docs] def exp_schedule(k=20, lam=0.005, limit=100): """One possible schedule function for simulated annealing""" return lambda t: (k * np.exp(-lam * t) if t < limit else 0)
[docs] def simulated_annealing(problem, schedule=exp_schedule()): """[Figure 4.5] CAUTION: This differs from the pseudocode as it returns a state instead of a Node.""" current = Node(problem.initial) for t in range(sys.maxsize): T = schedule(t) if T == 0: return current.state neighbors = current.expand(problem) if not neighbors: return current.state next_choice = random.choice(neighbors) delta_e = problem.value(next_choice.state) - problem.value(current.state) if delta_e > 0 or probability(np.exp(delta_e / T)): current = next_choice
[docs] def simulated_annealing_full(problem, schedule=exp_schedule()): """ This version returns all the states encountered in reaching the goal state.""" states = [] current = Node(problem.initial) for t in range(sys.maxsize): states.append(current.state) T = schedule(t) if T == 0: return states neighbors = current.expand(problem) if not neighbors: return current.state next_choice = random.choice(neighbors) delta_e = problem.value(next_choice.state) - problem.value(current.state) if delta_e > 0 or probability(np.exp(delta_e / T)): current = next_choice
# Pre-defined actions for PeakFindingProblem directions4 = {'W': (-1, 0), 'N': (0, 1), 'E': (1, 0), 'S': (0, -1)} directions8 = dict(directions4) directions8.update({'NW': (-1, 1), 'NE': (1, 1), 'SE': (1, -1), 'SW': (-1, -1)})
[docs] class PourProblem(Problem): """Problem about pouring water between jugs to achieve some water level. Each state is a tuple of levels. In the initialization, provide a tuple of capacities, e.g. PourProblem(initial=(2, 4, 3), goals={7}, capacities=(8, 16, 32)), which means three jugs of capacity 8, 16, 32 currently filled with 2, 4, 3 units of water, respectively, and the goal is to get a level of 7 in any one of the jugs.""" def __init__(self, initial=None, goals=(), capacities=None): super().__init__(initial, goals) self.goals = goals self.capacities = capacities
[docs] def actions(self, state): """The actions executable in this state: fill or dump any jug, or pour one into another.""" jugs = range(len(state)) return ([('Fill', i) for i in jugs if state[i] != self.capacities[i]] + [('Dump', i) for i in jugs if state[i] != 0] + [('Pour', i, j) for i in jugs for j in jugs if i != j])
[docs] def result(self, state, action): """The state that results from executing this action in this state.""" result = list(state) act, i, j = action[0], action[1], action[-1] if act == 'Fill': # fill jug i to its capacity result[i] = self.capacities[i] elif act == 'Dump': # empty jug i result[i] = 0 elif act == 'Pour': # pour jug i into jug j a, b = state[i], state[j] result[i], result[j] = ((0, a + b) if (a + b <= self.capacities[j]) else (a + b - self.capacities[j], self.capacities[j])) else: raise ValueError('unknown action', action) return tuple(result)
[docs] def goal_test(self, state): """True if any of the jugs has a level equal to one of the goal levels.""" return any(level in self.goals for level in state)
[docs] class PeakFindingProblem(Problem): """Problem of finding the highest peak in a limited grid""" def __init__(self, initial, grid, defined_actions=directions4): """The grid is a 2 dimensional array/list whose state is specified by tuple of indices""" super().__init__(initial) self.grid = grid self.defined_actions = defined_actions self.n = len(grid) assert self.n > 0 self.m = len(grid[0]) assert self.m > 0
[docs] def actions(self, state): """Returns the list of actions which are allowed to be taken from the given state""" allowed_actions = [] for action in self.defined_actions: next_state = vector_add(state, self.defined_actions[action]) if 0 <= next_state[0] <= self.n - 1 and 0 <= next_state[1] <= self.m - 1: allowed_actions.append(action) return allowed_actions
[docs] def result(self, state, action): """Moves in the direction specified by action""" return vector_add(state, self.defined_actions[action])
[docs] def value(self, state): """Value of a state is the value it is the index to""" x, y = state assert 0 <= x < self.n assert 0 <= y < self.m return self.grid[x][y]
[docs] class OnlineDFSAgent: """ [Figure 4.21] The abstract class for an OnlineDFSAgent. Override update_state method to convert percept to state. While initializing the subclass a problem needs to be provided which is an instance of a subclass of the Problem class. """ def __init__(self, problem): self.problem = problem self.s = None self.a = None self.untried = dict() self.unbacktracked = dict() self.result = {} def __call__(self, percept): s1 = self.update_state(percept) if self.problem.goal_test(s1): self.a = None else: if s1 not in self.untried.keys(): self.untried[s1] = list(self.problem.actions(s1)) self.unbacktracked[s1] = [] if self.s is not None: self.result[(self.s, self.a)] = s1 self.unbacktracked[s1].insert(0, self.s) if len(self.untried[s1]) == 0: if len(self.unbacktracked[s1]) == 0: self.a = None else: # else a <- an action b such that result[s', b] = POP(unbacktracked[s']) unbacktracked_pop = self.unbacktracked[s1].pop(0) for (s, b) in self.result.keys(): if s == s1 and self.result[(s, b)] == unbacktracked_pop: self.a = b break else: self.a = self.untried[s1].pop() self.s = s1 return self.a
[docs] def update_state(self, percept): """To be overridden in most cases. The default case assumes the percept to be of type state.""" return percept
# ______________________________________________________________________________
[docs] class OnlineSearchProblem(Problem): """ A problem which is solved by an agent executing actions, rather than by just computation. Carried in a deterministic and a fully observable environment.""" def __init__(self, initial, goal, graph): super().__init__(initial, goal) self.graph = graph
[docs] def actions(self, state): """Return the actions available from state (the outgoing graph edges).""" return self.graph.graph_dict[state].keys()
[docs] def output(self, state, action): """Return the state reached by taking action in state.""" return self.graph.graph_dict[state][action]
[docs] def h(self, state): """Returns least possible cost to reach a goal for the given state.""" return self.graph.least_costs[state]
[docs] def c(self, s, a, s1): """Returns a cost estimate for an agent to move from state 's' to state 's1'.""" return 1
[docs] def update_state(self, percept): """Convert a percept into a state (abstract; override in subclasses).""" raise NotImplementedError
[docs] def goal_test(self, state): """Return True if state is the goal state.""" if state == self.goal: return True return False
[docs] class LRTAStarAgent: """ [Figure 4.24] Abstract class for LRTA*-Agent. A problem needs to be provided which is an instance of a subclass of Problem Class. Takes a OnlineSearchProblem [Figure 4.23] as a problem. """ def __init__(self, problem): self.problem = problem # self.result = {} # no need as we are using problem.result self.H = {} self.s = None self.a = None def __call__(self, s1): # as of now s1 is a state rather than a percept if self.problem.goal_test(s1): self.a = None return self.a else: if s1 not in self.H: self.H[s1] = self.problem.h(s1) if self.s is not None: # self.result[(self.s, self.a)] = s1 # no need as we are using problem.output # minimum cost for action b in problem.actions(s) self.H[self.s] = min(self.LRTA_cost(self.s, b, self.problem.output(self.s, b), self.H) for b in self.problem.actions(self.s)) # an action b in problem.actions(s1) that minimizes costs self.a = min(self.problem.actions(s1), key=lambda b: self.LRTA_cost(s1, b, self.problem.output(s1, b), self.H)) self.s = s1 return self.a
[docs] def LRTA_cost(self, s, a, s1, H): """Returns cost to move from state 's' to state 's1' plus estimated cost to get to goal from s1.""" print(s, a, s1) if s1 is None: return self.problem.h(s) else: # sometimes we need to get H[s1] which we haven't yet added to H # to replace this try, except: we can initialize H with values from problem.h try: return self.problem.c(s, a, s1) + self.H[s1] except: return self.problem.c(s, a, s1) + self.problem.h(s1)
# ______________________________________________________________________________ # Genetic Algorithm
[docs] def genetic_algorithm(population, fitness_fn, gene_pool=[0, 1], f_thres=None, ngen=1000, pmut=0.1): """[Figure 4.8]""" for i in range(ngen): population = [mutate(recombine(*select(2, population, fitness_fn)), gene_pool, pmut) for i in range(len(population))] fittest_individual = fitness_threshold(fitness_fn, f_thres, population) if fittest_individual: return fittest_individual return max(population, key=fitness_fn)
[docs] def fitness_threshold(fitness_fn, f_thres, population): """Return the fittest individual if its fitness reaches f_thres, otherwise None (also None when no threshold f_thres is given).""" if not f_thres: return None fittest_individual = max(population, key=fitness_fn) if fitness_fn(fittest_individual) >= f_thres: return fittest_individual return None
[docs] def init_population(pop_number, gene_pool, state_length): """Initializes population for genetic algorithm pop_number : Number of individuals in population gene_pool : List of possible values for individuals state_length: The length of each individual""" g = len(gene_pool) population = [] for i in range(pop_number): new_individual = [gene_pool[random.randrange(0, g)] for j in range(state_length)] population.append(new_individual) return population
[docs] def select(r, population, fitness_fn): """Select r individuals from the population, sampling each with probability proportional to its fitness.""" fitnesses = map(fitness_fn, population) sampler = weighted_sampler(population, fitnesses) return [sampler() for i in range(r)]
[docs] def recombine(x, y): """Single-point crossover: combine a random-length prefix of x with the corresponding suffix of y.""" n = len(x) c = random.randrange(0, n) return x[:c] + y[c:]
[docs] def recombine_uniform(x, y): """Uniform crossover: build a child string by taking roughly half of its genes from x and the rest from y, at random positions.""" n = len(x) result = [0] * n indexes = random.sample(range(n), n) for i in range(n): ix = indexes[i] result[ix] = x[ix] if i < n / 2 else y[ix] return ''.join(str(r) for r in result)
[docs] def mutate(x, gene_pool, pmut): """With probability pmut, replace one randomly chosen gene of x with a random gene from gene_pool; otherwise return x unchanged.""" if random.uniform(0, 1) >= pmut: return x n = len(x) g = len(gene_pool) c = random.randrange(0, n) r = random.randrange(0, g) new_gene = gene_pool[r] return x[:c] + [new_gene] + x[c + 1:]
# _____________________________________________________________________________ # The remainder of this file implements examples for the search algorithms. # ______________________________________________________________________________ # Graphs and Graph Problems
[docs] class Graph: """A graph connects nodes (vertices) by edges (links). Each edge can also have a length associated with it. The constructor call is something like:: g = Graph({'A': {'B': 1, 'C': 2}) this makes a graph with 3 nodes, A, B, and C, with an edge of length 1 from A to B, and an edge of length 2 from A to C. You can also do:: g = Graph({'A': {'B': 1, 'C': 2}, directed=False) This makes an undirected graph, so inverse links are also added. The graph stays undirected; if you add more links with g.connect('B', 'C', 3), then inverse link is also added. You can use g.nodes() to get a list of nodes, g.get('A') to get a dict of links out of A, and g.get('A', 'B') to get the length of the link from A to B. 'Lengths' can actually be any object at all, and nodes can be any hashable object.""" def __init__(self, graph_dict=None, directed=True): self.graph_dict = graph_dict or {} self.directed = directed if not directed: self.make_undirected()
[docs] def make_undirected(self): """Make a digraph into an undirected graph by adding symmetric edges.""" for a in list(self.graph_dict.keys()): for (b, dist) in self.graph_dict[a].items(): self.connect1(b, a, dist)
[docs] def connect(self, A, B, distance=1): """Add a link from A and B of given distance, and also add the inverse link if the graph is undirected.""" self.connect1(A, B, distance) if not self.directed: self.connect1(B, A, distance)
[docs] def connect1(self, A, B, distance): """Add a link from A to B of given distance, in one direction only.""" self.graph_dict.setdefault(A, {})[B] = distance
[docs] def get(self, a, b=None): """Return a link distance or a dict of {node: distance} entries. .get(a,b) returns the distance or None; .get(a) returns a dict of {node: distance} entries, possibly {}.""" links = self.graph_dict.setdefault(a, {}) if b is None: return links else: return links.get(b)
[docs] def nodes(self): """Return a list of nodes in the graph.""" s1 = set([k for k in self.graph_dict.keys()]) s2 = set([k2 for v in self.graph_dict.values() for k2, v2 in v.items()]) nodes = s1.union(s2) return list(nodes)
[docs] def UndirectedGraph(graph_dict=None): """Build a Graph where every edge (including future ones) goes both ways.""" return Graph(graph_dict=graph_dict, directed=False)
[docs] def RandomGraph(nodes=list(range(10)), min_links=2, width=400, height=300, curvature=lambda: random.uniform(1.1, 1.5)): """Construct a random graph, with the specified nodes, and random links. The nodes are laid out randomly on a (width x height) rectangle. Then each node is connected to the min_links nearest neighbors. Because inverse links are added, some nodes will have more connections. The distance between nodes is the hypotenuse times curvature(), where curvature() defaults to a random number between 1.1 and 1.5.""" g = UndirectedGraph() g.locations = {} # Build the cities for node in nodes: g.locations[node] = (random.randrange(width), random.randrange(height)) # Build roads from each city to at least min_links nearest neighbors. for i in range(min_links): for node in nodes: if len(g.get(node)) < min_links: here = g.locations[node] def distance_to_node(n): if n is node or g.get(node, n): return np.inf return distance(g.locations[n], here) neighbor = min(nodes, key=distance_to_node) d = distance(g.locations[neighbor], here) * curvature() g.connect(node, neighbor, int(d)) return g
""" [Figure 3.2] Simplified road map of Romania """ romania_map = UndirectedGraph(dict( Arad=dict(Zerind=75, Sibiu=140, Timisoara=118), Bucharest=dict(Urziceni=85, Pitesti=101, Giurgiu=90, Fagaras=211), Craiova=dict(Drobeta=120, Rimnicu=146, Pitesti=138), Drobeta=dict(Mehadia=75), Eforie=dict(Hirsova=86), Fagaras=dict(Sibiu=99), Hirsova=dict(Urziceni=98), Iasi=dict(Vaslui=92, Neamt=87), Lugoj=dict(Timisoara=111, Mehadia=70), Oradea=dict(Zerind=71, Sibiu=151), Pitesti=dict(Rimnicu=97), Rimnicu=dict(Sibiu=80), Urziceni=dict(Vaslui=142))) romania_map.locations = dict( Arad=(91, 492), Bucharest=(400, 327), Craiova=(253, 288), Drobeta=(165, 299), Eforie=(562, 293), Fagaras=(305, 449), Giurgiu=(375, 270), Hirsova=(534, 350), Iasi=(473, 506), Lugoj=(165, 379), Mehadia=(168, 339), Neamt=(406, 537), Oradea=(131, 571), Pitesti=(320, 368), Rimnicu=(233, 410), Sibiu=(207, 457), Timisoara=(94, 410), Urziceni=(456, 350), Vaslui=(509, 444), Zerind=(108, 531)) """ [Figure 4.9] Eight possible states of the vacumm world Each state is represented as * "State of the left room" "State of the right room" "Room in which the agent is present" 1 - DDL Dirty Dirty Left 2 - DDR Dirty Dirty Right 3 - DCL Dirty Clean Left 4 - DCR Dirty Clean Right 5 - CDL Clean Dirty Left 6 - CDR Clean Dirty Right 7 - CCL Clean Clean Left 8 - CCR Clean Clean Right """ vacuum_world = Graph(dict( State_1=dict(Suck=['State_7', 'State_5'], Right=['State_2']), State_2=dict(Suck=['State_8', 'State_4'], Left=['State_2']), State_3=dict(Suck=['State_7'], Right=['State_4']), State_4=dict(Suck=['State_4', 'State_2'], Left=['State_3']), State_5=dict(Suck=['State_5', 'State_1'], Right=['State_6']), State_6=dict(Suck=['State_8'], Left=['State_5']), State_7=dict(Suck=['State_7', 'State_3'], Right=['State_8']), State_8=dict(Suck=['State_8', 'State_6'], Left=['State_7']) )) """ [Figure 4.23] One-dimensional state space Graph """ one_dim_state_space = Graph(dict( State_1=dict(Right='State_2'), State_2=dict(Right='State_3', Left='State_1'), State_3=dict(Right='State_4', Left='State_2'), State_4=dict(Right='State_5', Left='State_3'), State_5=dict(Right='State_6', Left='State_4'), State_6=dict(Left='State_5') )) one_dim_state_space.least_costs = dict( State_1=8, State_2=9, State_3=2, State_4=2, State_5=4, State_6=3) """ [Figure 6.1] Principal states and territories of Australia """ australia_map = UndirectedGraph(dict( T=dict(), SA=dict(WA=1, NT=1, Q=1, NSW=1, V=1), NT=dict(WA=1, Q=1), NSW=dict(Q=1, V=1))) australia_map.locations = dict(WA=(120, 24), NT=(135, 20), SA=(135, 30), Q=(145, 20), NSW=(145, 32), T=(145, 42), V=(145, 37))
[docs] class GraphProblem(Problem): """The problem of searching a graph from one node to another.""" def __init__(self, initial, goal, graph): super().__init__(initial, goal) self.graph = graph
[docs] def actions(self, A): """The actions at a graph node are just its neighbors.""" return list(self.graph.get(A).keys())
[docs] def result(self, state, action): """The result of going to a neighbor is just that neighbor.""" return action
[docs] def path_cost(self, cost_so_far, A, action, B): """Cost so far plus the length of the edge from A to B (np.inf if no edge).""" return cost_so_far + (self.graph.get(A, B) or np.inf)
[docs] def find_min_edge(self): """Find minimum value of edges.""" m = np.inf for d in self.graph.graph_dict.values(): local_min = min(d.values()) m = min(m, local_min) return m
[docs] def h(self, node): """h function is straight-line distance from a node's state to goal.""" locs = getattr(self.graph, 'locations', None) if locs: if type(node) is str: return int(distance(locs[node], locs[self.goal])) return int(distance(locs[node.state], locs[self.goal])) else: return np.inf
[docs] class GraphProblemStochastic(GraphProblem): """ A version of GraphProblem where an action can lead to nondeterministic output i.e. multiple possible states. Define the graph as dict(A = dict(Action = [[<Result 1>, <Result 2>, ...], <cost>], ...), ...) A the dictionary format is different, make sure the graph is created as a directed graph. """
[docs] def result(self, state, action): """Return the list of possible states that the action may lead to.""" return self.graph.get(state, action)
[docs] def path_cost(self): """Not defined for the stochastic graph problem.""" raise NotImplementedError
# ______________________________________________________________________________
[docs] class NQueensProblem(Problem): """The problem of placing N queens on an NxN board with none attacking each other. A state is represented as an N-element array, where a value of r in the c-th entry means there is a queen at column c, row r, and a value of -1 means that the c-th column has not been filled in yet. We fill in columns left to right. >>> depth_first_tree_search(NQueensProblem(8)) <Node (7, 3, 0, 2, 5, 1, 6, 4)> """ def __init__(self, N): super().__init__(tuple([-1] * N)) self.N = N
[docs] def actions(self, state): """In the leftmost empty column, try all non-conflicting rows.""" if state[-1] != -1: return [] # All columns filled; no successors else: col = state.index(-1) return [row for row in range(self.N) if not self.conflicted(state, row, col)]
[docs] def result(self, state, row): """Place the next queen at the given row.""" col = state.index(-1) new = list(state[:]) new[col] = row return tuple(new)
[docs] def conflicted(self, state, row, col): """Would placing a queen at (row, col) conflict with anything?""" return any(self.conflict(row, col, state[c], c) for c in range(col))
[docs] def conflict(self, row1, col1, row2, col2): """Would putting two queens in (row1, col1) and (row2, col2) conflict?""" return (row1 == row2 or # same row col1 == col2 or # same column row1 - col1 == row2 - col2 or # same \ diagonal row1 + col1 == row2 + col2) # same / diagonal
[docs] def goal_test(self, state): """Check if all columns filled, no conflicts.""" if state[-1] == -1: return False return not any(self.conflicted(state, state[col], col) for col in range(len(state)))
[docs] def h(self, node): """Return number of conflicting queens for a given node""" num_conflicts = 0 for (r1, c1) in enumerate(node.state): for (r2, c2) in enumerate(node.state): if (r1, c1) != (r2, c2): num_conflicts += self.conflict(r1, c1, r2, c2) return num_conflicts
# ______________________________________________________________________________ # Inverse Boggle: Search for a high-scoring Boggle board. A good domain for # iterative-repair and related search techniques, as suggested by Justin Boyan. ALPHABET = 'ABCDEFGHIJKLMNOPQRSTUVWXYZ' cubes16 = ['FORIXB', 'MOQABJ', 'GURILW', 'SETUPL', 'CMPDAE', 'ACITAO', 'SLCRAE', 'ROMASH', 'NODESW', 'HEFIYE', 'ONUDTK', 'TEVIGN', 'ANEDVZ', 'PINESH', 'ABILYT', 'GKYLEU']
[docs] def random_boggle(n=4): """Return a random Boggle board of size n x n. We represent a board as a linear list of letters.""" cubes = [cubes16[i % 16] for i in range(n * n)] random.shuffle(cubes) return list(map(random.choice, cubes))
# The best 5x5 board found by Boyan, with our word list this board scores # 2274 words, for a score of 9837 boyan_best = list('RSTCSDEIAEGNLRPEATESMSSID')
[docs] def boggle_neighbors(n2, cache={}): """Return a list of lists, where the i-th element is the list of indexes for the neighbors of square i.""" if cache.get(n2): return cache.get(n2) n = exact_sqrt(n2) neighbors = [None] * n2 for i in range(n2): neighbors[i] = [] on_top = i < n on_bottom = i >= n2 - n on_left = i % n == 0 on_right = (i + 1) % n == 0 if not on_top: neighbors[i].append(i - n) if not on_left: neighbors[i].append(i - n - 1) if not on_right: neighbors[i].append(i - n + 1) if not on_bottom: neighbors[i].append(i + n) if not on_left: neighbors[i].append(i + n - 1) if not on_right: neighbors[i].append(i + n + 1) if not on_left: neighbors[i].append(i - 1) if not on_right: neighbors[i].append(i + 1) cache[n2] = neighbors return neighbors
[docs] def exact_sqrt(n2): """If n2 is a perfect square, return its square root, else raise error.""" n = int(np.sqrt(n2)) assert n * n == n2 return n
# _____________________________________________________________________________
[docs] class Wordlist: """This class holds a list of words. You can use (word in wordlist) to check if a word is in the list, or wordlist.lookup(prefix) to see if prefix starts any of the words in the list.""" def __init__(self, file, min_len=3): lines = file.read().upper().split() self.words = [word for word in lines if len(word) >= min_len] self.words.sort() self.bounds = {} for c in ALPHABET: c2 = chr(ord(c) + 1) self.bounds[c] = (bisect.bisect(self.words, c), bisect.bisect(self.words, c2))
[docs] def lookup(self, prefix, lo=0, hi=None): """See if prefix is in dictionary, as a full word or as a prefix. Return two values: the first is the lowest i such that words[i].startswith(prefix), or is None; the second is True iff prefix itself is in the Wordlist.""" words = self.words if hi is None: hi = len(words) i = bisect.bisect_left(words, prefix, lo, hi) if i < len(words) and words[i].startswith(prefix): return i, (words[i] == prefix) else: return None, False
def __contains__(self, word): return self.lookup(word)[1] def __len__(self): return len(self.words)
# _____________________________________________________________________________
[docs] class BoggleFinder: """A class that allows you to find all the words in a Boggle board.""" wordlist = None # A class variable, holding a wordlist def __init__(self, board=None): if BoggleFinder.wordlist is None: BoggleFinder.wordlist = Wordlist(open_data("EN-text/wordlist.txt")) self.found = {} if board: self.set_board(board)
[docs] def set_board(self, board=None): """Set the board, and find all the words in it.""" if board is None: board = random_boggle() self.board = board self.neighbors = boggle_neighbors(len(board)) self.found = {} for i in range(len(board)): lo, hi = self.wordlist.bounds[board[i]] self.find(lo, hi, i, [], '') return self
[docs] def find(self, lo, hi, i, visited, prefix): """Looking in square i, find the words that continue the prefix, considering the entries in self.wordlist.words[lo:hi], and not revisiting the squares in visited.""" if i in visited: return wordpos, is_word = self.wordlist.lookup(prefix, lo, hi) if wordpos is not None: if is_word: self.found[prefix] = True visited.append(i) c = self.board[i] if c == 'Q': c = 'QU' prefix += c for j in self.neighbors[i]: self.find(wordpos, hi, j, visited, prefix) visited.pop()
[docs] def words(self): """The words found.""" return list(self.found.keys())
scores = [0, 0, 0, 0, 1, 2, 3, 5] + [11] * 100
[docs] def score(self): """The total score for the words found, according to the rules.""" return sum([self.scores[len(w)] for w in self.words()])
def __len__(self): """The number of words found.""" return len(self.found)
# _____________________________________________________________________________
[docs] def boggle_hill_climbing(board=None, ntimes=100, verbose=True): """Solve inverse Boggle by hill-climbing: find a high-scoring board by starting with a random one and changing it.""" finder = BoggleFinder() if board is None: board = random_boggle() best = len(finder.set_board(board)) for _ in range(ntimes): i, oldc = mutate_boggle(board) new = len(finder.set_board(board)) if new > best: best = new if verbose: print(best, _, board) else: board[i] = oldc # Change back if verbose: print_boggle(board) return board, best
[docs] def mutate_boggle(board): """Randomly replace one cell of a Boggle board with a random face of a random die, returning the changed index and the old letter so the move can be undone.""" i = random.randrange(len(board)) oldc = board[i] # random.choice(boyan_best) board[i] = random.choice(random.choice(cubes16)) return i, oldc
# ______________________________________________________________________________ # Code to compare searchers on various problems.
[docs] class InstrumentedProblem(Problem): """Delegates to a problem, and keeps statistics.""" def __init__(self, problem): self.problem = problem self.succs = self.goal_tests = self.states = 0 self.found = None
[docs] def actions(self, state): """Delegate to the wrapped problem, counting the successor query.""" self.succs += 1 return self.problem.actions(state)
[docs] def result(self, state, action): """Delegate to the wrapped problem, counting the generated state.""" self.states += 1 return self.problem.result(state, action)
[docs] def goal_test(self, state): """Delegate to the wrapped problem, counting the goal test and recording the state when it is a goal.""" self.goal_tests += 1 result = self.problem.goal_test(state) if result: self.found = state return result
[docs] def path_cost(self, c, state1, action, state2): """Delegate the path cost computation to the wrapped problem.""" return self.problem.path_cost(c, state1, action, state2)
[docs] def value(self, state): """Delegate the state value computation to the wrapped problem.""" return self.problem.value(state)
def __getattr__(self, attr): return getattr(self.problem, attr) def __repr__(self): return '<{:4d}/{:4d}/{:4d}/{}>'.format(self.succs, self.goal_tests, self.states, str(self.found)[:4])
[docs] def compare_searchers(problems, header, searchers=[breadth_first_tree_search, breadth_first_graph_search, depth_first_graph_search, iterative_deepening_search, depth_limited_search, recursive_best_first_search]): """Run every searcher on every problem and print a table of the resulting InstrumentedProblem statistics (successors / goal tests / states generated).""" def do(searcher, problem): p = InstrumentedProblem(problem) searcher(p) return p table = [[name(s)] + [do(s, p) for p in problems] for s in searchers] print_table(table, header)
[docs] def compare_graph_searchers(): """Prints a table of search results.""" compare_searchers(problems=[GraphProblem('Arad', 'Bucharest', romania_map), GraphProblem('Oradea', 'Neamt', romania_map), GraphProblem('Q', 'WA', australia_map)], header=['Searcher', 'romania_map(Arad, Bucharest)', 'romania_map(Oradea, Neamt)', 'australia_map'])