These examples illustrate algorithms implemented by nondeterministic atomic actions. These examples are from the book, "Parallel Program Design: A Foundation" by Chandy & Misra and use ideas from UNITY theory. This section discusses the following concepts.

  1. An agent is any object that has a method called next(). An agent is either active or idle. An idle agent does nothing until it is activated. An active agent executes its next() step at some future point. An agent x can be activated by calling activate(x).

  2. Incremental Computations Sense and respond applications monitor data structures that last forever. These applications continuously compute parameters of these structures. For example, an application may monitor a set of objects, such as images, and create clusters of similar objects using a clustering algorithm. As objects are added, deleted or modified the clusters change. The parameters after a small change is made to a persistent data structure can be computed incrementally; start with the parameters before the change and adjust them. Recomputing the sort of a list when an element of the list is changed can be done by starting with the sorted list before the change.

  3. Concurrent computation design. "Parallel Program Design: A Foundation" shows how to construct parallel programs for different types of concurrent systems by first focusing on the key elementary operations and then determining how to orchestrate these elementary operations to best fit a target platform. For example, the elementary operation for sorting is to flip out of order pairs, and the elementary operation for computing shortest paths is to compute the shortest path for a triangle. Later steps determine how to order these operations for multicore and distributed platforms.

These examples also illustrate signal streams and shared objects.

Signal Streams

A value is appended to a signal stream when a shared variable changes value. Agents read signal streams to determine when shared variables change value. Usually True or 1 as the only values appended to signal streams, though any other value can be used. When an agent changes a variable that it shares with other agents it appends True or 1 to a signal stream that informs other agents that this variable has changed.

Let’s look at the following examples for incremental computation using agents: all-points shortest paths and sorting. We first describe the examples using shared objects and later describe programs that use signal streams.

All-Points Shortest Path using Shared Variables

Given a directed weighted graph specified by a matrix d where d[i,j] is the weight of the edge (i, j), modify d so that d[i,j] becomes the shortest path from vertex i to vertex j. This example is directly from Parallel Program Design: A Foundation and it illustrates a way of designing concurrent computations.

The elementary operation

The elementary operation is establishing the triangle inequality. For any vertices i, j, k:

d[i, k] <= d[i, j] + d[j,k]

So, for each ordered triple (i, j, k), we create an agent responsible for establishing its triangle inequality.

If the agent (i, j, k) decreases d[i,k], then the agent activates the agents (i, k, r) and (r, i, k) for all r. This is because if d[i,k] has decreased, then the triangle inequality has to be established for triangles (i, k, r) and (r, i, k).

We create a triangle inequality class:

class triangle_inequality(object):
    def __init__(self, d, i, j, k):
        self.d, self.i, self.j, self.k = d, i, j, k
        self.actors = None
    def next(self):
        if self.d[self.i][self.k] > self.d[self.i][self.j] + self.d[self.j][self.k]:
            self.d[self.i][self.k] = self.d[self.i][self.j] + self.d[self.j][self.k]
            for r in range(len(self.d)):
                activate(self.actors[self.i][self.k][r])
                activate(self.actors[r][self.i][self.k])

The next() method establishes the triangle inequality for (i, j, k) and activates agents (i, k, r) and (r, i, k) when d[i, k] changes.

The shortest path program merely creates and activates agents for every triple, and then calls run() to start the agents.

def shortest_path(d):
    R = range(len(d))
    actors = [[[[] for k in R] for j in R] for i in R]
    for i in R:
        for j in R:
            for k in R:
                actors[i][j][k] = triangle_inequality(d, i, j, k)
                actors[i][j][k].actors = actors
                activate(actors[i][j][k])
    run()

Sorting using shared variables

This example — sort a list — is also directly from Parallel Program Design: A Foundation.

The elementary operation

The elementary operation is to establish that a pair of adjacent elements are in order:

the_list[i] <= the_list[i+1]

where the_list is the list to be sorted in place, and i is an index into the list.

We create an agent i that is responsible for establishing this condition. If agent i changes the_list[i] and the_list[i+1] then it activates agents i-1 and i+1 to establish conditions for pairs (i-1, i) and (i+1, i+2).

class sort_pair(object):
    def __init__(self, the_list, my_index):
        self.the_list, self.my_index = the_list, my_index
        self.actors = None
        self.N = len(self.the_list)
    def next(self):
        if self.the_list[self.my_index] > self.the_list[self.my_index+1]:
            self.the_list[self.my_index], self.the_list[self.my_index+1] = \
              self.the_list[self.my_index+1], self.the_list[self.my_index]
            if self.my_index > 0: activate(self.actors[self.my_index-1])
            if self.my_index < self.N-2: activate(self.actors[self.my_index+1])

The program merely creates and activates an agent for each pair (i, i+1), and calls run().

def sort_simple(the_list):
    N = len(the_list)
    actors = [sort_pair(the_list, my_index) for my_index in range(N-1)]
    for actor in actors:
        actor.actors = actors
        activate(actor)
    run()

Next, we discuss the same examples using signal streams.

Sorting using signal streams

The program using signal streams is similar to that using shared variables; the difference is that an agent puts a signal (either 1 or True) on a signal stream to indicate a change. Agents reading a signal stream respond to reading a new signal by taking a next step.

def sort(lst):
    # flip elements that are out of order.
    def flip(i):
        if lst[i-1] > lst[i]:
            lst[i-1], lst[i] = lst[i], lst[i-1]
            return 1
        else:
            return _no_value
    # Create streams
    S = [Stream() for i in range(len(lst)+1)]
    # Create agents
    for i in range(1, len(lst)):
        signal_element(
           func=flip, 
           in_stream=weave_f([S[i-1],S[i+1]]), 
           out_stream=S[i], i=i)
    # Insert data to start agents
    for stream in S: stream.append(1)
    run()

The flip() function returns 1 if execution of the function changes the list and returns _no_value otherwise. The agents are indexed i in the interval [1, …, N-1] where N is the length of the list. Agent i puts lst[i-1] and lst[i] in increasing order. Agent i outputs a value on stream S[i] whenever it changes lst[i-1] or lst[i]. Agent i reads an input stream which is an asynchronous merge of streams S[i-1] and S[i+1]. If either S[i-1] or S[i+1]has a new value appended to it, agent i is woken up and takes a step. So agent i is woken up if agent i-1 changed lst[i-1]. Agent i is also woken up if agent i+1 changed lst[i].

Initially, all the agents are woken up because 1 is appended to all streams. The computation terminates when no streams are extended and therefore all agents become quiescent.

Shortest Path USING Signal streams

This program is similar to the one using shared variables. We first define the central function which is encapsulated by an agent. For a triple i, j, k, this function makes the triangle inequality hold, i.e., D[i,j] + D[j,k] <= D[i,k] where D is initially the matrix of edge distances.

 # STEP 1. DEFINE FUNCTION TO BE ENCAPSULATED
    def triangle_inequality(triple):
        i, j, k = triple
        if D[i][j] + D[j][k] < D[i][k]:
            D[i][k] = D[i][j] + D[j][k]
            return 1
        else:
            return _no_value

The next step is to create streams that indicate when agent has changed the data structure. We have one agent for each triple, and so we also have one stream for each triple.

    # STEP 2. CREATE STREAMS
    indices = range(len(D))
    changed = [[ Stream() for i in indices] for j in indices]

The next step is to create agents, one for each triple (i, j, k)

# STEP 3. CREATE AGENTS
    for i in indices:
        for j in indices:
            for k in indices:
                signal_element(func=triangle_inequality,
                               in_stream=weave_f([
                                  changed[i][j], changed[j][k]]),
                               out_stream=changed[i][k],
                               triple=[i, j, k])

The agent for triple (i,j,k) gets an input stream which is an asynchronous merge of streams changed[i][j] and changed[j][k]. So, this agent is woken up whenever D[i][j] or D[j][k] decreases in value. If this agent changes D[i][k] then it appends a value (1) to the stream changed[i][k].

The final steps is to start the computation by putting a value into each of the streams which causes all the agents to be woken up.

SORTING EXAMPLES WITH SHARED VARIABLES

In this example we partition a list into contiguous sublists, and each sublist is assigned an agent that sorts it. An agent managing a sublist interacts with agents managing neighboring sublists to ensure that the contiguous sublists are in order. We use a shared variable for each element of the list. Agents register with shared variables and an agent can activate all agents that are registered with a shared variable.

We first give the code and then describe it. An instance of this class is an agent which is responsible for sorting the sublist l[i:j], where l is the list to be sorted. signals is a list of shared variables with signals[i] associated with l[i].

class sort_2(object):
    def __init__(self, i, j, l, signals, name):
        self.i, self.j, self.l = i, j, l
        self.signals = signals
        self.name = name
        signals[i-1].register(self)
        signals[j].register(self)
        activate(self)
    def next(self):
        self.l[self.i : self.j] = sorted(self.l[self.i : self.j])
        if (self.l[self.i-1] > self.l[self.i]):
            self.l[self.i-1], self.l[self.i] = self.l[self.i], self.l[self.i-1]
            self.signals[self.i].activate()
            activate(self)
        if (self.j < len(self.l)) and (self.l[self.j-1] > self.l[self.j]):
            self.l[self.j-1], self.l[self.j] = self.l[self.j], self.l[self.j-1]
            self.signals[self.j-1].activate()
            activate(self)

A collection of sort_2 objects sorts a list l. Each object sorts a sublist [[i:j]. For example, a list of length 40 may be partitioned into 4 sublists each of length 10, and each sublist is managed by a sort_2 object.

signals is a list of Shared objects. The agent managing sublist [[i:j] needs to be informed when l[i-1] changes value, and also when l[j] changes value. So, this agent registers with the shared variables signals[i-1] and signals[j]. If the agent modifies l[i] then it activates all agents that are registered with signals[i]. In this example, the only agent registered with signals[i] is the agent responsible for the sublist to the left. Likewise, if the agent modifies l[j-1] then it activates all agents that are registered with signals[j-1]. In this example, the only agent registered with signals[j-1] is the agent responsible for the sublist to the right.

If these neighboring values are out of order, then then agent swaps the values so that they become in order, and then the agent resorts its sublist, and informs its neighbors.