I/O operations: EXTERNAL SOURCES OF DATA

Sources: no input STREAM, single output STREAM

See IoTPy/IoTPy/agent_types/source.py

A source of data may be a sensor, Twitter, a file, or a function such as a random number generator.  A source agent may have to suspend execution while it interacts with the source of data. When reading from a sensor or Twitter, your execution may suspend while the external source is getting data. Or you may want a source agent to read a block of lines in a file, add the lines to a stream, suspend execution for a short while and then read the next block. Each source agent executes in a separate thread. So, if the source suspends execution, other threads in the process can continue.

A few source functions are described below.

Twitter

The IoTPy package uses tweepy to put data (Tweets) from Twitter on to a stream. You must have a Twitter account and sign on to Twitter and get credentials as described in the tweepy introduction. The credentials are: consumer_key, consumer_secretaccess_token, access_token_secret

You create an agent that listens to Twitter and puts tweets on a stream by calling the function twitter_to_stream. The parameters, trackwords, is a list of words that Twitter will track for you. If trackwords, is [ "Trump" , "Clinton"] then Twitter will send you the tweets which contain the words "Trump" or "Clinton". 

During testing you may want to stop the thread after it appends a specified number of tweets to the stream. After testing is complete, you typically want the thread to execute for ever. If you specify the parameter num_steps to be a positive integer then the thread will stop after num_steps tweets have been appended to the stream. If num_steps is None then the thread will execute for ever.

twitter_to_stream causes tweets to be placed on the specified out_stream.

twitter_to_stream(
   consumer_key, consumer_secret,
   access_token, access_token_secret,
   trackwords, out_stream, num_steps=0)

An example of a call to the function is:

s = Stream()
twitter_to_stream(
   consumer_key="insert_your_value_here", 
   consumer_secret="insert_your_value_here",
   access_token="insert_your_value_here",
   access_token_secret="insert_your_value_here",
   trackwords=['Trump', 'Clinton'], 
   out_stream=s, num_steps=10)
Slide10.jpg

Tweets with the words "Trump" or "Clinton" will be placed on stream s. The objects placed on stream s are Python dicts (dictionaries) and not character strings. Working with dicts is easier than working with text strings in many cases.

 

SOUrce_function

source_func_to_stream(
        func, out_stream, time_interval=0, num_steps=None, 
        window_size=1, state=None, name='source_f', 
        **kwargs)

This source function returns a thread which puts the result of a call to the function, func, on the stream outstream. Each call to func returns a value to be appended to out_stream. The thread executes a sequence of steps where time_interval is the number of seconds that the thread sleeps between successive steps. 

At each step, func is called a window_size number of times. Usually, window_size is 1, and each step puts the result of a single call to func on the stream called stream_name. You may want to simulate sensors that provide data in bursts, and in that case you should set window_size to the size of the burst. For example, you may want to simulate a sensor that outputs 256 successive measurements on each sensor output message; in this case set window_size to 256.

state, and kwargs are optional parameters of func and are exactly the same as in the other types of agents described earlier. name is the name of the agent created by the call; the name is optional and is helpful in debugging.

If num_steps is None the thread runs for ever. If num_steps is a positive integer then the source thread stops execution after num_steps steps. This is helpful because you may want to execute a thread for only a short time during debugging.

Examples

stream of uniform random numbers

import random
s = Stream('random')
random_thread = source_func_to_stream(
        func=random.random, out_stream=s, 
        time_interval=0.001, num_steps=10)
# A value generated by random.random() is appended to stream s.

The code (above) creates a thread that executes 10 steps where the thread sleeps 0.001 seconds between successive steps. At each step, the thread calls the function random.random once (since window_size = 1), and puts the value returned by the call on stream s.

stream of random integers between 1 and 100

The starting of threads is not shown in the code in the following examples. This example illustrates the use of additional keyword arguments , a and b, of the function func, which is random.randint in the example.

randint_thread = source_func_to_stream(
        func=random.randint, out_stream=s, 
        time_interval=0.005, num_steps=20, window_size=10,
        a=1, b=100 # parameters of randint
        )

The thread executes 20 steps and then terminates. The thread sleeps 0.005 seconds between successive steps. At each step, the thread calls the function random.randint(a=1, b=100) a total of 10 (window_size) times, and at each step it appends the 10 values returned by these calls on stream s.

stream of average of previous random numbers

The next example illustrates a thread with memory (or state). The previous two examples had no memory.

def average_of_random(state):
   # state is a 2-tuple (total, count)
   # total is the sum of values so far
   # count is the number of values
   total, count = state
   total += random.random()
   count += 1
   # returns next output, new state
   return total/float(count), (total, count)

avg_thread = source_func_to_stream(
    func=average_of_random, out_stream=s,
    time_interval=0.05, state=(0.0,0))

Since source_func_to_stream does not have a value for num_steps, the thread executes for ever or until an exception is raised. The thread sleeps for 0.05 seconds between successive steps. Since source_func_to_stream does not have a value for window_size, the thread calls the encapsulated function, average_of_random, once on each step.

The call to source_func_to_stream has a parameter called state. The state of func (i.e. average_of_random) is (total, count) where total is the sum of the random numbers generated so far, and count is the number of random numbers generated. The initial state is (0.0, 0). The encapsulated function, average_of_random, has a single argument which is the state and it returns two values: (1) the new element total float(count) to be appended to the output stream and (2) the new state. On each step, the thread appends the element total float(count) to the stream average_of_random.

example: clock_streams

def clock_ticks(state, max_value=2**30): 
   return state, (state+1)%max_value
clock_thread = source_func_to_stream(
        func=clock_ticks, stream=s,
        time_interval=0.001, num_steps=32, 
        state=0, max_value=4)

Starting clock_thread runs the thread which appends 0, 1, 2, 3, 0, 1, 2, 3, 0, 1, 2, 3, 0.. to stream s.This example illustrates a source with a state and with an additional keyword argument, max_value.

The thread executes 32 steps and terminates. The thread sleeps for 0.001 seconds between successive steps. The thread appends values to a stream s. At each step the thread calls the encapsulated function clock_ticks. Since state (one of the parameters of source_func_to_stream) is specified, the thread has state. The function clock_ticks returns two values: a value to be appended to the output stream and the next state. The next state is (state+1)%max_value, and the value appended to the output stream is state; thus the sequence of values in the output stream is 0 (the initial state specified in source_function), 1, 2,..., max_value, 1, 0, 1, 2, ... Each value is output appended to output stream s every 0.001 seconds.

source_file: File to stream

The following function returns a thread that reads the next line from a file specified by filename; it then calls function func which is applied to this line; then the value returned by func is appended to out_stream. The other parameters - such as window_size, state, name - are the same as in any other source.

source_file_to_stream(
        func, out_stream, filename, time_interval=0,
        num_steps=None, window_size=1, state=None, 
        name='source_file', **kwargs)

EXAMPLE

file_thread = source_file_to_stream(
        func=lambda x: 2*int(x), out_stream=s, 
        filename='test_file.dat', time_interval=0.1)

In this example, test_file.dat is a file with an integer on each line. 

The code (above) creates a thread, file_thread. Since num_steps is not specified, the thread executes until the entire file is read. The thread sleeps for 0.1 seconds between successive steps. Since window_size is not specified, it's default value (1) is used. At each step, one line (which is assumed to be a number) of the file called "test_file.dat" is read, and func is applied to the number (i.e. the number is doubled) and the result is placed on the stream s.

source_list: List to stream

source_list_to_stream(
        in_list, out_stream, time_interval=0, num_steps=None,
        window_size=1, name='source_list_to_stream')

This function returns a thread which reads the elements of the list in_list and puts the elements on the stream out_stream.

Example

list_thread = source_list_to_stream(
    in_list=range(10), out_stream=s, 
    time_interval=0.01)

Starting the thread list_thread causes the values in in_list (i.e. 0, 1, … , 9) to be appended to stream s.

Code Repository

The code for source is in agent_types/source.py and tests are in tests/source_test.py.

next: agent types. sinks. single input, no output

previous: agent types. split. single input, multiple outputs