This is an explanation of the so-called Wavefront Expansion Algorithm, and how it applies to a discrete patch world.
We need an environment so that we may test and visualise our problem.
First we must define our environment that our agent is to explore:
0001 import random 0002 random.seed(0) # For the purpose of reproducing the result 0003 0004 def env_create(width = 5, height = 5, robot = (1, 1), nodes = 5, walls = []) : 0005 # Create basic world 0006 world = [] 0007 for y in range(height) : 0008 world += [ [] ] 0009 for x in range(width) : 0010 world[y] += [ ' ' ] 0011 # Add border 0012 for y in range(height) : 0013 world[y][0] = '#' 0014 world[y][width - 1] = '#' 0015 for x in range(width) : 0016 world[0][x] = '#' 0017 world[height - 1][x] = '#' 0018 # Add walls 0019 for wall in walls : 0020 print(wall) # TODO 0021 if not (wall[1] == robot[1] and wall[0] == robot[0]) and world[wall[1]][wall[0]] == ' ' : 0022 world[wall[1]][wall[0]] = '#' 0023 # Add nodes 0024 # TODO: In theory this could result in an infinite loop. 0025 while nodes > 0 : 0026 i = random.randint(0, width - 1) 0027 j = random.randint(0, height - 1) 0028 if j != robot[1] and i != robot[0] and world[j][i] == ' ' : 0029 world[j][i] = '@' 0030 nodes -= 1 0031 return world 0032 0033 r = (1, 1) 0034 w = env_create(8, 8, r, 2, [(1,2), (4,2), (4,3), (4,4), (2,4)])
(1, 2) (4, 2) (4, 3) (4, 4) (2, 4)
This consists of the basics, including a width
, height
, the location of the robot
(nothing can be placed here), a number of distributed nodes
(areas to be visited), and walls
(obstacles to be overlaid). A node can be placed randomly in any given patch that does not contain a wall.
We want a way to visualise the environment we created:
0035 def env_graphic(w, r, o = []) : 0036 width = len(w[0]) 0037 height = len(w) 0038 z = 32 0039 g = Svgg(width = width * z, height = height * z, bg = "#FFF") 0040 # Print the world 0041 for j in range(height) : 0042 for i in range(width) : 0043 bg = "#FFF" 0044 if w[j][i] == '#' : bg = "#222" 0045 if w[j][i] == '@' : bg = "#F80" 0046 if r[1] == i and r[0] == j : bg = "#00B" 0047 g.add("rect", {"x": i * z, "y": j * z, "width": z, "height": z, "fill": bg, "stroke": "#000"}) 0048 # Add text overlay 0049 for i in range(len(o)) : 0050 x = o[i][0] 0051 y = o[i][1] 0052 t = o[i][2] 0053 g.add("text", {"x": (x + 0.5) * z, "y": (y + 0.5) * z, "dominant-baseline": "middle", "text-anchor": "middle", "fill": "#F00", "value": t}) 0054 return g 0055 0056 print(env_graphic(w, r, o = [(r[0], r[1], 'X')]).to_uri())
In this visualisation, white spaces the agent can travel, grey spaces are obstacles, blue is the agent , yellow/orange are the nodes.
We will also want some tools to help us.
The first tool searches the environment and creates a list of all the matches. To retrieve a list of nodes, we search for the ‘@’ symbol like so:
0057 def map_get(w, s) : 0058 found = [] 0059 for j in range(len(w)) : 0060 for i in range(len(w[0])) : 0061 if w[j][i] == s : 0062 found += [ (i, j) ] 0063 return found 0064 0065 nodes = map_get(w, '@') 0066 print(nodes)
[(5, 3), (6, 6)]
Next, a function that describes the available actions an agent can take from a given location:
0067 def env_opts(w, r) : 0068 opts = [ 0069 (-1, 0), # Left 0070 ( 1, 0), # Right 0071 ( 0, -1), # Up 0072 ( 0, 1), # Down 0073 ] 0074 # Remove options not available 0075 for o in opts[:] : 0076 r_ = ( r[0] + o[0], r[1] + o[1] ) 0077 if w[r_[1]][r_[0]] == '#' : 0078 opts.remove(o) 0079 return opts 0080 0081 opts = env_opts(w, r) 0082 print(opts)
[(1, 0)]
As you can see, the only available option is to go right.
The wavefront algorithm is an iterative counter from a given starting location, until a goal is reached. The counter always describes how many moves were required to get to a location, and you never override the counter of a location already applied with a counter.
Let us create a counter map for the world:
0083 def wave_create(w, origin = (0, 0)) : 0084 map = [] 0085 for y in range(len(w)) : 0086 map += [ [] ] 0087 for x in range(len(w[0])) : 0088 map[y] += [ -1 ] 0089 # Place a zero at origin 0090 map[origin[1]][origin[0]] = 0 0091 return map 0092 0093 wave = wave_create(w, r)
And now let’s convert this to an overlay and visualise it:
0094 def wave_to_overlay(m) : 0095 overlay = [] 0096 for y in range(len(m)) : 0097 for x in range(len(m[0])) : 0098 if m[y][x] >= 0 : 0099 overlay += [ (x, y, str(m[y][x])) ] 0100 return overlay 0101 0102 overlay = wave_to_overlay(wave) 0103 print(env_graphic(w, r, o = overlay).to_uri())
Not too exciting.
Now we find all of the locations with a , and from each of those locations we calculate all the possible options from that location, and update all of the non-entered positions as a . This means that each one of those positions took step to reach that location.
0104 def wave_step(w, wave, i = 0) : 0105 wave_front = map_get(wave, i) 0106 for front in wave_front : 0107 opts = env_opts(w, front) 0108 for opt in opts : 0109 x = front[0] + opt[0] 0110 y = front[1] + opt[1] 0111 if wave[y][x] < 0 : 0112 wave[y][x] = i + 1 0113 return wave 0114 0115 wave = wave_step(w, wave, i = 0) 0116 overlay = wave_to_overlay(wave) 0117 print(env_graphic(w, r, o = overlay).to_uri())
Okay cool, the only available option from that position is explored. Now with the next step we expect a to be placed to the right and below the :
0118 wave = wave_step(w, wave, i = 1) 0119 overlay = wave_to_overlay(wave) 0120 print(env_graphic(w, r, o = overlay).to_uri())
Cool! Each of those locations with a took steps to reach. Doing this manually will get boring, so we can simply keep going until an objective is reached:
0121 def wave_itterate(w, wave, i = 0, max_steps = 10, goal = None) : 0122 steps = 0 0123 while True : 0124 # Is the goal reached? 0125 if wave[goal[1]][goal[0]] >= 0 : break 0126 wave = wave_step(w, wave, i = i) 0127 steps += 1 0128 if steps >= max_steps : break 0129 i += 1 0130 return wave 0131 0132 wave = wave_itterate(w, wave, i = 1, max_steps = len(w) * len(w), goal = nodes[0]) 0133 overlay = wave_to_overlay(wave) 0134 print(env_graphic(w, r, o = overlay).to_uri())
And now we see that the search continues until the first node objective is reached!
Now, what do we do with this wave mapping?
It turns out that you can perform optimal path planning. If you start at the objective and move into the next neighbouring patch decremented by , it turns out that you can optimally plan your path.
0135 def wave_path(wave, goal) : 0136 opts = [ 0137 (-1, 0), # Left 0138 ( 1, 0), # Right 0139 ( 0, -1), # Up 0140 ( 0, 1), # Down 0141 ] 0142 i = wave[goal[1]][goal[0]] 0143 path = [] 0144 pos = goal 0145 while i > 0 : 0146 path += [ ( pos[0], pos[1], str(i) ) ] 0147 i -= 1 0148 # Search around path position for the next step 0149 for opt in opts : 0150 x = pos[0] + opt[0] 0151 y = pos[1] + opt[1] 0152 if wave[y][x] == i : 0153 pos = (x, y) 0154 continue 0155 return path 0156 0157 path = wave_path(wave, goal = nodes[0]) 0158 print(env_graphic(w, r, o = path).to_uri())
Note that you can often generate more than one optimal path, but we ignore this for now.
The step distance between two points is actually easy to calculate, it is simply the value after the search, i.e.:
0159 goal = nodes[0] 0160 print(wave[goal[1]][goal[0]])
6
Let’s instead find an optimal path to the second node. Let’s start by calculating the wavefront:
0161 wave = wave_create(w, r) 0162 wave = wave_itterate(w, wave, i = 0, max_steps = len(w) * len(w), goal = nodes[1]) 0163 overlay = wave_to_overlay(wave) 0164 print(env_graphic(w, r, o = overlay).to_uri())
And now for an optimal path:
0165 path = wave_path(wave, goal = nodes[1]) 0166 print(env_graphic(w, r, o = path).to_uri())
Let us add another node manually:
0167 w[6][1] = '@' 0168 print(env_graphic(w, r, o = []).to_uri())
Let us the update the list of nodes:
0169 nodes = map_get(w, '@') 0170 print(nodes)
[(5, 3), (1, 6), (6, 6)]
We want to figure out the connections between each of the nodes. We first define the connections:
0171 # NOTE: The symmetry has been removed. 0172 conns = [ 0173 ( r, nodes[0], -1 ), 0174 ( r, nodes[1], -1 ), 0175 ( r, nodes[2], -1 ), 0176 #( nodes[0], r, -1 ), 0177 ( nodes[0], nodes[1], -1 ), 0178 ( nodes[0], nodes[2], -1 ), 0179 #( nodes[1], r, -1 ), 0180 #( nodes[1], nodes[0], -1 ), 0181 ( nodes[1], nodes[2], -1 ), 0182 #( nodes[2], r, -1 ), 0183 #( nodes[2], nodes[0], -1 ), 0184 #( nodes[2], nodes[1], -1 ), 0185 ]
Now we want to print our graph:
0186 def env_graphic_graph(w, r, o = [], n = [], c = []) : 0187 # Add nodes to overlay 0188 o += [ ( r[0], r[1], 'X' ) ] 0189 for node in n : 0190 o += [ ( node[0], node[1], 'X' ) ] 0191 # Normal graphic 0192 g = env_graphic(w, r, o = o) 0193 # Overlay connections 0194 z = 32 0195 for conn in c : 0196 a = conn[0] 0197 b = conn[1] 0198 m = (a[0] + ((b[0] - a[0]) / 2), a[1] + ((b[1] - a[1]) / 2)) 0199 wt = conn[2] 0200 g.add("line", {"x1": (a[0] + 0.5) * z, "y1": (a[1] + 0.5) * z, "x2": (b[0] + 0.5) * z, "y2": (b[1] + 0.5) * z, "style": 'stroke:#F0F;stroke-width:2;'}) 0201 g.add("rect", {"x": (m[0] - 0.5) * z, "y": (m[1] + 0.25) * z, "width": z * 2, "height": z * 0.5, "fill": "#0F08", "stroke": "#0000"}) 0202 g.add("text", {"x": (m[0] + 0.5) * z, "y": (m[1] + 0.5) * z, "dominant-baseline": "middle", "text-anchor": "middle", "fill": "#F00", "value": str(wt)}) 0203 return g 0204 0205 print(env_graphic_graph(w, r, o = [], n = nodes, c = conns).to_uri())
Cool. Now let’s calculate the distance for each connection and visualise:
0206 for i in range(len(conns)) : 0207 conn = conns[i] 0208 a = conn[0] 0209 b = conn[1] 0210 wave = wave_create(w, a) 0211 wave = wave_itterate(w, wave, i = 0, max_steps = len(w) * len(w), goal = b) 0212 wt = wave[b[1]][b[0]] 0213 conns[i] = ( a, b, wt ) 0214 0215 print(env_graphic_graph(w, r, o = [], n = nodes, c = conns).to_uri())
Now we have a graph representation of the environment!
We have a series of weights for connections between nodes, but we want to know which to select first.
We want to get all connections relevant to a specific node, in this case the location of the agent :
0216 def get_node_conns(c, n) : 0217 node_conns = [] 0218 for conn in c : 0219 a = conn[0] 0220 b = conn[1] 0221 if (a[0] == n[0] and a[1] == n[1]) or (b[0] == n[0] and b[1] == n[1]) : 0222 node_conns += [ conn ] 0223 return node_conns 0224 0225 print(get_node_conns(conns, r))
[((1, 1), (5, 3), 6), ((1, 1), (1, 6), 7), ((1, 1), (6, 6), 10)]
The greedy algorithm says that we should pick the one with the smallest weight, then remove that node/connections from the list. This may look as so:
0226 def alg_greedy(c, n) : 0227 conns = c[:] # Make shallow copy 0228 path = [ n ] 0229 while len(conns) > 0 : 0230 node_conns = get_node_conns(conns, path[-1]) 0231 smallest = node_conns[0] 0232 for nc in node_conns : 0233 if nc[2] < smallest[2] : 0234 smallest = nc 0235 if smallest[0] != path[-1] : 0236 path += [ smallest[0] ] 0237 else : 0238 path += [ smallest[1] ] 0239 for nc in node_conns : 0240 conns.remove(nc) 0241 return path 0242 0243 path_greedy = alg_greedy(conns, r) 0244 for n in path_greedy : 0245 print(n)
(1, 1) (5, 3) (6, 6) (1, 6)
Drawn onto our graph it looks something like this:
0246 def path_to_graph(p) : 0247 graph = [] 0248 for i in range(1, len(p)) : 0249 graph += [ ( (p[i-1][0], p[i-1][1]), (p[i][0], p[i][1]), i ) ] 0250 return graph 0251 0252 graph = path_to_graph(path_greedy) 0253 print(env_graphic_graph(w, r, o = [], n = nodes, c = graph).to_uri())
And there we have it, our greedy path, with the order in which the paths are taken.
Essentially any graph solution used by problems such as the Travelling Salesman Problem (TSP) can be computed in this graph representation.