Your code + IoTPy: Multiple Processes

See IoTPy/concurrency/multicore.py for code.

and IoTPy/tests/concurrency/test_multicore.py for tests.

This section shows how to integrate processes running non-IoTPy code with processes running IoTPy. The processes operate concurrently on shared memory using Python’s feature, multiprocessing.Array, available in versions 2+.

(Note: Python 3.8+ allows general memory sharing. A version of IoTPy for 3.8+ will be released later. )

We first describe how to create a single process running IoTPy, then show how to connect multiple IoTPy processes together, and finally show how to integrate non-IoTPy processes.

An iotpy PROCESS in a multicore application

The specification of an IoTPy multicore application is similar to the specification of a network of agents. You connect the output stream of a process to an input stream of another process. You can also specify threads that put data into streams. We first give an overview of an IoTPy process and then discuss the notation used for specifying it. Here is an example:

Slide1.jpg

The function executed by a process

Each process executes a function. The function is called f in this example. The function has parameters in_streams and out_streams which are lists of input streams and output streams. The function is an agent that reads its input streams and appends values to the tails of its output streams. (Recall that an agent can be a network of agents.)

In this diagram, in_streams is a list with 3 values and out_streams is a list with 2 values.

Process inputs and outputs

Each process has inputs and outputs which are lists of names of input and output streams. In this diagram, the inputs of the process are [‘u’, ‘v’, ‘w’] and the outputs are [‘y’, ‘z’]. The names ‘u’, ‘v’, ‘w’ are the names associated with streams in_streams[0], in_streams[1], and in_streams[2], respectively. Likewise, ‘y’ and ‘z’ are the names associated with stream out_streams[0], and out_streams[1], respectively. We use names (strings) when we declare streams connecting processes though the function within a process operates on streams (instances of class Stream).

Sources

A source is a stream that is fed values by a thread that interfaces with an external source of data such as a sensor, Twitter, or a file.

When values are appended to a source stream, the processes that read that stream are notified. This notification takes a little computation. We specify the process in which we want that computation to take place by associating the source stream with the process, as illustrated in the following diagram.

Slide2.jpg


In the above diagram, a thread puts data into a stream called ‘x’. We associate the source ‘x’ with this process ‘p’. A source can be associated with any process. You pick the process with which a source is associated based on performance considerations: pick the process with the least load. However, the computation associated with a source is so small that it doesn’t make much difference.

Specification of a multiprocess program

The specification of an IoTPy multiprocess program is a list containing 2 items: (1) a specification of streams and (2) a specification of processes.

Specification of streams

The specification of streams is a list of 2-tuples: stream name, and stream data type, as shown in the following example:

[('x', 'i'), ('y', 'i')]

This example has two streams called ‘x’ and ‘y’ both of which are of type int, ‘i’ (see Stream Data Types below). The stream names are strings. The streams with the specified names are the streams that connect processes to each other.

Stream Data Types A stream data type can be char, int, float, double or other types listed in the Python documentation for the class multiprocessing.Array. The strings ‘c’, ‘i’, ‘f’, and ‘d’, specify data types char, int, float, and double, respectively. The Python documentation has more data types. These data types are required to use concurrent shared memory for releases earlier than Python 3.8.

Unknown Stream Data Types You can specify a source stream data type to be ‘x’ for unknown. Source streams of unknown type are connected to the process in which they execute. If you have an external source of data that generates JSON or YAML then you can use the unknown data type for the source stream which is converted by an agent in the process to known types.

Specification of processes

The specification of a group of processes is a list containing one element for the specification of each component process. The specification of each process is a dict that can contain the following keys: ‘name’, ‘agent’, ‘inputs’, ‘outputs’, ‘sources’, ‘threads’, ‘args’, and ‘keyword_args’. The key ‘agent’ must appear in the dict, but other keys can be omitted. If the process is not given a name then IoTPy assigns it a unique name.

The value corresponding to keywords is as follows:

  • name’: the name of the process.

  • inputs’, ‘outputs’ and ‘sources’: lists of names of streams.

  • agent’: function executed by the agent. This function must have two parameters: in_streams which is a list of input streams, and out_streams which is a list of input streams. In addition, the function may have positional and keyword arguments.

  • args’, ‘keyword_args’: list of args and dict of keyword args, respectively. These arguments are used for the agent function.

  • output_queues’: a list of instances of multiprocessing.Queue(). IoTPy streams can be copied to output queues and non-IoTPy processes can read these queues. If and when a multiprocess program terminates execution, a message containing the string ‘_finished’ is put in each output queue indicating that no further values will be placed on the queue.

Example

{'name': 'p1', 'agent': g, 'inputs': ['y']}

The above example is of a process with name ‘p1’, and with an agent g that has a single input stream called ‘y’ and no output stream. An agent is a function with the following parameters: a list of input streams, a list of output streams, and possibly additional positional and keyword arguments. An example of the function is given below:

def g(inputs, outputs):
    s = Stream('s')
    map_element(func=lambda v: v*2, 
                in_stream=inputs[0], out_stream=s)
    print_stream(s)

Function g reads a single input stream, inputs[0], and doesn’t modify any output stream. The process has a single input stream — the stream called ‘y’ —- which is bound to inputs[0].

Another example of a process is given next.

Example

{'name': 'p0', 'agent': f, 'inputs':['x'], 'outputs': ['y'], 
'sources': ['x']}

This process has name ‘p0’ and has a single input called ‘x’, a single output called ‘y’, and a single source stream also called ‘x’. An example of the agent function f is given next.

def f(inputs, outputs):
    map_element(lambda v: v+10, 
                in_stream=inputs[0], out_stream=outputs[0])

Function f reads a single input stream, inputs[0], and modifies a single output stream, outputs[0]. The input stream, inputs[0], is bound to the stream called ‘x’ and outputs[0] to the stream called ‘y’.

Example of a specification of a multicore application

A specification of a multicore application — with functions f, g and h defined elsewhere — is shown in the code and diagram below.

multicore_specification = \
    [ # list of stream names and types
      [('x', 'i'), ('y', 'i')],

      [ # specification for process p0
        {'name': 'p0', 'agent': f, 
         'inputs': ['x'], 'outputs': ['y'], 'sources': ['x']},

         # specification for process p1
         {'name': 'p1', 'agent': g, 'inputs': ['y']} ] ]
Example of a process specification

Example of a process specification

As we said earlier, you can associate a source with any process, and the only difference will be a small change in performance. For example, we can modify the previous example by putting the source in the process called ‘p1’.

Threads

Any thread can extend a source stream by calling the function extend_stream:

extend_stream(procs, data, stream_name)

where procs is a dict containing information about processes, stream_name is the name of the stream, and data is the list by which the stream is extended. For example,

extend_stream(procs, data=[1, 10], stream_name='x')

extends the source called ‘x’ by [1, 10].

A thread indicates that no further data will arrive on a source by calling the function terminate_stream. For example,

terminate_stream(procs, stream_name='x')

signals that no further data will be added to the steam called ‘x’. Here is an example of the target of a simple thread:

Example of thread target

def thread_target(procs):
    extend_stream(procs, data=[1,10], stream_name='x')
    terminate_stream(procs, stream_name='x')

A thread executing this target extends a source stream called ‘x’ with [1, 10] and then signals that no further changes to the stream will occur.

procs

procs is a dict in which the keys are process names and the values are objects that contain information about the process. These objects are instances of the class MulticoreProcess and contain information such as the threads running in the process and the Python process itself. For example, procs[proc_name].threads and procs[proc_name].process are the list of threads and the process associated with the process name, proc_name.

For example, you can specify that the only thread (in addition to the main thread) that will be running in the process with name ‘p1’ is thread_0 as follows:

procs['p1'].threads = [thread_0]

Running processes

You start, join and terminate processes in four steps:

  1. Get the list of processes and procs by executing the function get_processes_and_procs.

processes, procs = \
    get_processes_and_procs(multicore_specification)

2. Specify the threads that put data into streams. For example:

thread_0 = threading.Thread(target=thread_target, args=(procs,))

3. For each process, specify the threads (if any) running in that process. For example:

procs['p1'].threads = [thread_0]

4. Start, join and terminate processes.

for process in processes: process.start()
for process in processes: process.join()
for process in processes: process.terminate()

Termination Detection

A multiprocess program terminates execution at a point after a terminate_stream() is executed for each of its source streams and every agent is quiescent (not executing) waiting for additional values to be placed on its input streams. An agent does not modify its output streams while it is waiting to read values on its input streams and a source stream is unmodified after a terminate_stream() is executed on it; so not stream is modified from this point onward and no agent takes further steps.

IoTPy detects when a multiprocess program terminates.

Next, let’s look at an example of how output queues are used to send data in streams from IoTPy processes. The following is the specification of a process named ‘p1’ with a single input stream called ‘result’, an agent function g, the positional arguments shown in the args list, with a single output queue, q, and with a single thread called my_thread. The agent function g puts values into q, and other processes can then gets values from q.

q = multiprocessing.Queue()
# Process p1
{'name': 'p1', 'agent': g, 'inputs': ['result'],
 'args': [MULTIPLICAND, EXPONENT, q, buffer, ptr],
 'output_queues': [q],'threads': [my_thread]}

A non-IoTPy process can get values from an output queue in an IoTPy process until it gets a ‘_finished’ message as shown in the following example.

def print_data_from_output_queue(q):
    while True:
        v = q.get()
        if v == '_finished':
            break
        print (v)
# Thread
my_thread = threading.Thread(
    target=print_data_from_output_queue, args=(q,))

keyword arguments of agent functions

You can specify keyword arguments of an agent function are specified as illustrated in the following example.

# Process p0
    {'name': 'p0', 'agent': f, 'inputs':['x'], 'outputs': ['y'],
     'keyword_args': {ADDEND: 10}}

Passing arrays to IoTPy multiprocessing programs

You can use multiprocessing.Array to pass data to an IoTPy multicore application. Agent functions can read and write to such arrays. For example:

def f(in_streams, out_streams, shared_array, shared_array_ptr):
   # read and write shared_array
   return

buffer = multiprocessing.Array('i', 1000)
ptr =  multiprocessing.Value('i', 0)
# Put data into buffer and update ptr.
data = [1, 2, 3, 5, 7, 11]
buffer[0:len(data)] = data
ptr.value = len(data)

# Specification of process p0
{'name': 'p0', 'agent': f, 'inputs':['x'], 'outputs': ['y'],
            'keyword_args' : {'shared_array' : buffer,
                              'shared_array_ptr' : ptr}, ...}

The code creating an IoTPy multiprocess program puts data into a buffer that it wants to share with the IoTPy program and sets ptr to point to the end of the buffer.. After the IoTPy program starts, the process called ‘p0’ can read and write buffer and ptr. When the program terminates, other processes can read and write buffer and ptr.

Examples of multicore applications are found in IoTPy/IoTPy/tests/multicore_test.py. Next we look at distributed applications using pub/sub (publication & subscription) mechanisms.