Distributed Applications

See IoTPy/IoTPy/multiprocessing/distributed.py.

You can build distributed applications by connecting IoTPy code to messaging frameworks. Here we describe connectors from IoTPy to Pika — Python code that implements a message broker called RabbitMQ. You can build connectors to other message brokers — such as ActiveMQ, and Kafka — by modifying the connector to Pika.

Sending messages from iotpy to a message broker

You can send messages from IoTPy to a message broker in two ways:

  1. Publish stream: Use a sink agent that publishes a stream to the message broker

  2. Publish queue: Use a thread that publishes messages that it gets from a queue.

PUBLISH stream: Copying a stream to a message broker

# Agent that copies a stream to a message broker
def f(in_streams, out_streams):
    publisher = PikaPublisher(
        routing_key='temperature',
        exchange='publications', host='localhost')
    sink_list(publisher.publish_list, in_streams[0])

IoTPy/IoTPy/concurrency/ has a class called PikaPublisher. The parameters, routing_key, exchange and host, are described in the Pika documents. publish_list is a method of the PikaPublisher class; this method publishes a list on a message broker.

sink_list(publisher.publish_list, in_streams[0])

creates a sink agent which continuously gets new segments in_streams[0] and then publishes the segments by calling publish_list.

publish queue in a Thread

def publish_data_from_queue(q):
    publisher = PikaPublisher(
        routing_key='temperature',
        exchange='publications', host='localhost')
    while True:
        v = q.get()
        if v == '_finished': break
        else: publisher.publish_list([v])

Receiving Messages from a message broker

You receive messages by creating a source thread as illustrated in the following example which extends a stream called ‘x’ with published data.

# Source thread target for source stream named 'x'.
def h(proc):
    def callback(ch, method, properties, body):
        proc.copy_stream(data=json.loads(body), stream_name='x')
    pika_subscriber = PikaSubscriber(
        callback, routing_key='temperature',
        exchange='publications', host='localhost')
    pika_subscriber.start()

PikaSubscriber is a class in IoTPy/IoTPy/concurrency/. The parameters, callback, routing_key, exchange and host are described in the Pika documents. The callback function is called when RabbitMQ detects that a message has been published with this routing key and exchange. This particular callback function merely extends the stream called ‘x’ with the message.