Coffee Space


Listen:

Wavefront Algorithm

Preview Image

This is an explanation of the so-called Wavefront Expansion Algorithm, and how it applies to a discrete patch world.

Environment

We need an environment so that we may test and visualise our problem.

Creation

First we must define our environment WW that our agent RR is to explore:

0001 import random
0002 random.seed(0) # For the purpose of reproducing the result
0003 
0004 def env_create(width = 5, height = 5, robot = (1, 1), nodes = 5, walls = []) :
0005   # Create basic world
0006   world = []
0007   for y in range(height) :
0008     world += [ [] ]
0009     for x in range(width) :
0010       world[y] += [ ' ' ]
0011   # Add border
0012   for y in range(height) :
0013     world[y][0] = '#'
0014     world[y][width - 1] = '#'
0015   for x in range(width) :
0016     world[0][x] = '#'
0017     world[height - 1][x] = '#'
0018   # Add walls
0019   for wall in walls :
0020     print(wall) # TODO
0021     if not (wall[1] == robot[1] and wall[0] == robot[0]) and world[wall[1]][wall[0]] == ' ' :
0022       world[wall[1]][wall[0]] = '#'
0023   # Add nodes
0024   # TODO: In theory this could result in an infinite loop.
0025   while nodes > 0 :
0026     i = random.randint(0, width - 1)
0027     j = random.randint(0, height - 1)
0028     if j != robot[1] and i != robot[0] and world[j][i] == ' ' :
0029       world[j][i] = '@'
0030       nodes -= 1
0031   return world
0032 
0033 r = (1, 1)
0034 w = env_create(8, 8, r, 2, [(1,2), (4,2), (4,3), (4,4), (2,4)])
(1, 2)
(4, 2)
(4, 3)
(4, 4)
(2, 4)

This consists of the basics, including a width, height, the location of the robot RR (nothing can be placed here), a number of distributed nodes (areas to be visited), and walls (obstacles to be overlaid). A node can be placed randomly in any given patch that does not contain a wall.

Visualisation

We want a way to visualise the environment we created:

0035 def env_graphic(w, r, o = []) :
0036   width = len(w[0])
0037   height = len(w)
0038   z = 32
0039   g = Svgg(width = width * z, height = height * z, bg = "#FFF")
0040   # Print the world
0041   for j in range(height) :
0042     for i in range(width) :
0043       bg = "#FFF"
0044       if w[j][i] == '#' : bg = "#222"
0045       if w[j][i] == '@' : bg = "#F80"
0046       if r[1] == i and r[0] == j : bg = "#00B"
0047       g.add("rect", {"x": i * z, "y": j * z, "width": z, "height": z, "fill": bg, "stroke": "#000"})
0048   # Add text overlay
0049   for i in range(len(o)) :
0050     x = o[i][0]
0051     y = o[i][1]
0052     t = o[i][2]
0053     g.add("text", {"x": (x + 0.5) * z, "y": (y + 0.5) * z, "dominant-baseline": "middle", "text-anchor": "middle", "fill": "#F00", "value": t})
0054   return g
0055 
0056 print(env_graphic(w, r, o = [(r[0], r[1], 'X')]).to_uri())

In this visualisation, white spaces the agent RR can travel, grey spaces are obstacles, blue is the agent RR, yellow/orange are the nodes.

Tools

We will also want some tools to help us.

The first tool searches the environment and creates a list of all the matches. To retrieve a list of nodes, we search for the ‘@’ symbol like so:

0057 def map_get(w, s) :
0058   found = []
0059   for j in range(len(w)) :
0060     for i in range(len(w[0])) :
0061       if w[j][i] == s :
0062         found += [ (i, j) ]
0063   return found
0064 
0065 nodes = map_get(w, '@')
0066 print(nodes)
[(5, 3), (6, 6)]

Next, a function that describes the available actions an agent RR can take from a given location:

0067 def env_opts(w, r) :
0068   opts = [
0069     (-1,  0), # Left
0070     ( 1,  0), # Right
0071     ( 0, -1), # Up
0072     ( 0,  1), # Down
0073   ]
0074   # Remove options not available
0075   for o in opts[:] :
0076     r_ = ( r[0] + o[0], r[1] + o[1] )
0077     if w[r_[1]][r_[0]] == '#' :
0078       opts.remove(o)
0079   return opts
0080 
0081 opts = env_opts(w, r)
0082 print(opts)
[(1, 0)]

As you can see, the only available option is to go right.

Wavefront

The wavefront algorithm is an iterative counter from a given starting location, until a goal is reached. The counter always describes how many moves were required to get to a location, and you never override the counter of a location already applied with a counter.

Let us create a counter map for the world:

0083 def wave_create(w, origin = (0, 0)) :
0084   map = []
0085   for y in range(len(w)) :
0086     map += [ [] ]
0087     for x in range(len(w[0])) :
0088       map[y] += [ -1 ]
0089   # Place a zero at origin
0090   map[origin[1]][origin[0]] = 0
0091   return map
0092 
0093 wave = wave_create(w, r)

And now let’s convert this to an overlay and visualise it:

0094 def wave_to_overlay(m) :
0095   overlay = []
0096   for y in range(len(m)) :
0097     for x in range(len(m[0])) :
0098       if m[y][x] >= 0 :
0099         overlay += [ (x, y, str(m[y][x])) ]
0100   return overlay
0101 
0102 overlay = wave_to_overlay(wave)
0103 print(env_graphic(w, r, o = overlay).to_uri())

Not too exciting.

Now we find all of the locations with a 00, and from each of those locations we calculate all the possible options from that location, and update all of the non-entered positions as a 11. This means that each one of those positions took 11 step to reach that location.

0104 def wave_step(w, wave, i = 0) :
0105   wave_front = map_get(wave, i)
0106   for front in wave_front :
0107     opts = env_opts(w, front)
0108     for opt in opts :
0109       x = front[0] + opt[0]
0110       y = front[1] + opt[1]
0111       if wave[y][x] < 0 :
0112         wave[y][x] = i + 1
0113   return wave
0114 
0115 wave = wave_step(w, wave, i = 0)
0116 overlay = wave_to_overlay(wave)
0117 print(env_graphic(w, r, o = overlay).to_uri())

Okay cool, the only available option from that position is explored. Now with the next step we expect a 22 to be placed to the right and below the 11:

0118 wave = wave_step(w, wave, i = 1)
0119 overlay = wave_to_overlay(wave)
0120 print(env_graphic(w, r, o = overlay).to_uri())

Cool! Each of those locations with a 22 took 22 steps to reach. Doing this manually will get boring, so we can simply keep going until an objective is reached:

0121 def wave_itterate(w, wave, i = 0, max_steps = 10, goal = None) :
0122   steps = 0
0123   while True :
0124     # Is the goal reached?
0125     if wave[goal[1]][goal[0]] >= 0 : break
0126     wave = wave_step(w, wave, i = i)
0127     steps += 1
0128     if steps >= max_steps : break
0129     i += 1
0130   return wave
0131 
0132 wave = wave_itterate(w, wave, i = 1, max_steps = len(w) * len(w), goal = nodes[0])
0133 overlay = wave_to_overlay(wave)
0134 print(env_graphic(w, r, o = overlay).to_uri())

And now we see that the search continues until the first node objective is reached!

Now, what do we do with this wave mapping?

Path Planning

It turns out that you can perform optimal path planning. If you start at the objective and move into the next neighbouring patch decremented by 11, it turns out that you can optimally plan your path.

0135 def wave_path(wave, goal) :
0136   opts = [
0137     (-1,  0), # Left
0138     ( 1,  0), # Right
0139     ( 0, -1), # Up
0140     ( 0,  1), # Down
0141   ]
0142   i = wave[goal[1]][goal[0]]
0143   path = []
0144   pos = goal
0145   while i > 0 :
0146     path += [ ( pos[0], pos[1], str(i) ) ]
0147     i -= 1
0148     # Search around path position for the next step
0149     for opt in opts :
0150       x = pos[0] + opt[0]
0151       y = pos[1] + opt[1]
0152       if wave[y][x] == i :
0153         pos = (x, y)
0154         continue
0155   return path
0156 
0157 path = wave_path(wave, goal = nodes[0])
0158 print(env_graphic(w, r, o = path).to_uri())

Note that you can often generate more than one optimal path, but we ignore this for now.

Distance

The step distance between two points is actually easy to calculate, it is simply the value after the search, i.e.:

0159 goal = nodes[0]
0160 print(wave[goal[1]][goal[0]])
6

Different Node

Let’s instead find an optimal path to the second node. Let’s start by calculating the wavefront:

0161 wave = wave_create(w, r)
0162 wave = wave_itterate(w, wave, i = 0, max_steps = len(w) * len(w), goal = nodes[1])
0163 overlay = wave_to_overlay(wave)
0164 print(env_graphic(w, r, o = overlay).to_uri())

And now for an optimal path:

0165 path = wave_path(wave, goal = nodes[1])
0166 print(env_graphic(w, r, o = path).to_uri())

Graph Representation

Let us add another node manually:

0167 w[6][1] = '@'
0168 print(env_graphic(w, r, o = []).to_uri())

Let us the update the list of nodes:

0169 nodes = map_get(w, '@')
0170 print(nodes)
[(5, 3), (1, 6), (6, 6)]

We want to figure out the connections between each of the nodes. We first define the connections:

0171 # NOTE: The symmetry has been removed.
0172 conns = [
0173   (        r, nodes[0], -1 ),
0174   (        r, nodes[1], -1 ),
0175   (        r, nodes[2], -1 ),
0176   #( nodes[0],        r, -1 ),
0177   ( nodes[0], nodes[1], -1 ),
0178   ( nodes[0], nodes[2], -1 ),
0179   #( nodes[1],        r, -1 ),
0180   #( nodes[1], nodes[0], -1 ),
0181   ( nodes[1], nodes[2], -1 ),
0182   #( nodes[2],        r, -1 ),
0183   #( nodes[2], nodes[0], -1 ),
0184   #( nodes[2], nodes[1], -1 ),
0185 ]

Now we want to print our graph:

0186 def env_graphic_graph(w, r, o = [], n = [], c = []) :
0187   # Add nodes to overlay
0188   o += [ ( r[0], r[1], 'X' ) ]
0189   for node in n :
0190     o += [ ( node[0], node[1], 'X' ) ]
0191   # Normal graphic
0192   g = env_graphic(w, r, o = o)
0193   # Overlay connections
0194   z = 32
0195   for conn in c :
0196     a = conn[0]
0197     b = conn[1]
0198     m = (a[0] + ((b[0] - a[0]) / 2), a[1] + ((b[1] - a[1]) / 2))
0199     wt = conn[2]
0200     g.add("line", {"x1": (a[0] + 0.5) * z, "y1": (a[1] + 0.5) * z, "x2": (b[0] + 0.5) * z, "y2": (b[1] + 0.5) * z, "style": 'stroke:#F0F;stroke-width:2;'})
0201     g.add("rect", {"x": (m[0] - 0.5) * z, "y": (m[1] + 0.25) * z, "width": z * 2, "height": z * 0.5, "fill": "#0F08", "stroke": "#0000"})
0202     g.add("text", {"x": (m[0] + 0.5) * z, "y": (m[1] + 0.5) * z, "dominant-baseline": "middle", "text-anchor": "middle", "fill": "#F00", "value": str(wt)})
0203   return g
0204 
0205 print(env_graphic_graph(w, r, o = [], n = nodes, c = conns).to_uri())

Cool. Now let’s calculate the distance for each connection and visualise:

0206 for i in range(len(conns)) :
0207   conn = conns[i]
0208   a = conn[0]
0209   b = conn[1]
0210   wave = wave_create(w, a)
0211   wave = wave_itterate(w, wave, i = 0, max_steps = len(w) * len(w), goal = b)
0212   wt = wave[b[1]][b[0]]
0213   conns[i] = ( a, b, wt )
0214 
0215 print(env_graphic_graph(w, r, o = [], n = nodes, c = conns).to_uri())

Now we have a graph representation of the environment!

Greedy Algorithm

We have a series of weights for connections between nodes, but we want to know which to select first.

We want to get all connections relevant to a specific node, in this case the location of the agent RR:

0216 def get_node_conns(c, n) :
0217   node_conns = []
0218   for conn in c :
0219     a = conn[0]
0220     b = conn[1]
0221     if (a[0] == n[0] and a[1] == n[1]) or (b[0] == n[0] and b[1] == n[1]) :
0222       node_conns += [ conn ]
0223   return node_conns
0224 
0225 print(get_node_conns(conns, r))
[((1, 1), (5, 3), 6), ((1, 1), (1, 6), 7), ((1, 1), (6, 6), 10)]

The greedy algorithm says that we should pick the one with the smallest weight, then remove that node/connections from the list. This may look as so:

0226 def alg_greedy(c, n) :
0227   conns = c[:] # Make shallow copy
0228   path = [ n ]
0229   while len(conns) > 0 :
0230     node_conns = get_node_conns(conns, path[-1])
0231     smallest = node_conns[0]
0232     for nc in node_conns :
0233       if nc[2] < smallest[2] :
0234         smallest = nc
0235     if smallest[0] != path[-1] :
0236       path += [ smallest[0] ]
0237     else :
0238       path += [ smallest[1] ]
0239     for nc in node_conns :
0240       conns.remove(nc)
0241   return path
0242 
0243 path_greedy = alg_greedy(conns, r)
0244 for n in path_greedy :
0245   print(n)
(1, 1)
(5, 3)
(6, 6)
(1, 6)

Drawn onto our graph it looks something like this:

0246 def path_to_graph(p) :
0247   graph = []
0248   for i in range(1, len(p)) :
0249     graph += [ ( (p[i-1][0], p[i-1][1]), (p[i][0], p[i][1]), i ) ]
0250   return graph
0251 
0252 graph = path_to_graph(path_greedy)
0253 print(env_graphic_graph(w, r, o = [], n = nodes, c = graph).to_uri())

And there we have it, our greedy path, with the order in which the paths are taken.

Essentially any graph solution used by problems such as the Travelling Salesman Problem (TSP) can be computed in this graph representation.