Source code for examples_element_wrapper

"""This module contains examples of the 'element' wrapper.
A function that takes a single element of an input stream
and generates a single element of an output stream is an
example of a function that is wrapped by the 'element'
wrapper to create a function on streams.

The module has the following parts:
(1) op or simple operator:
        single input, single output
(2) sink:
        single input, no outputs
(3) source:
        no input, multiple outputs
(4) split:
        single input, multiple output
(5) merge:
        multiple input, single output
(6) general case:
        multiple input, multiple output

All of the first 5 cases are specializations of
the general case; however, the syntactic sugar they
provide can be helpful.

For each of the above cases we first consider agents
that are stateless and then consider agents with state.

"""
## Commenting this section that appends everything on the path
## if __name__ == '__main__':
##     if __package__ is None:
##         import sys
##         from os import path
##         sys.path.append( path.dirname( path.dirname( path.abspath(__file__) ) ) )

from Stream import Stream, _no_value, _multivalue
from Operators import stream_func, stream_agent
import json
import numpy as np

#######################################################
#            PART 1
#   SINGLE INPUT STREAM, SINGLE OUTPUT STREAM.
#######################################################

#______________________________________________________
# PART 1A: Stateless
#______________________________________________________
# Single input, single output, stateless functions

# Inputs to the functions:
#  element : object
#            element of the input stream

# Returned by the functions:
# element : object
#           element to be placed on the output stream.
#______________________________________________________


#            EXAMPLE 1
#
# SPECIFICATION:
# Write a function, square_stream, that has a single 
# input stream and that returns a stream whose elements
# are the squares of the elements of its input
# stream.

# If y = square_stream(x)
# and x is a stream with initial values [1, 3, 5, ...] then
# y must be a stream with initial values [1, 9, 25, ...]
# See main()
#
# HOW TO DEVELOP THE STREAMING PROGRAM.

# First step:
# Write a function that returns squares of
# its single input value.
[docs]def square(v): return v*v
# Second step: # Wrap the above function, square, using wrapping # function stream_func to obtain the desired # function square_stream.
[docs]def square_stream(stream): return stream_func( inputs=stream, f_type='element', f=square, num_outputs=1)
# stream_func is the wrapper. # f_type specifies how the function f is to be wrapped. # The initial examples have f_type='element' which # assumes that f operates on single elements of input streams # and produces single elements of output streams. # num_outputs is the number of output streams. We begin # with examples in which num_outputs=1. # inputs is set to the parameter of square_stream, and # so inputs=stream. # In this initial set of examples, the function has # a single input stream. So inputs is the parameter # You can also obtain a stream y1 that squares elements of # a stream x using stream_func directly, without defining # the function square_stream first: # y1 = stream_func( # inputs=x, f_type='element', f=square, num_outputs=1) # See main() # # EXAMPLE 2 # # SPECIFICATION: # Write a function, double_stream, that has a single # input stream and that returns a stream whose elements # are twice the values of the elements of its input # stream. # # If z = double_stream(x) # and x is a stream with initial values [1, 3, 5, ...] # then z must be a stream with initial values [2, 6, 10, ...] # See main() # # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function double that returns the double of # its single input value.
[docs]def double(v): return 2*v
# Second step: # Wrap the above function, double, to obtain # the desired function double_stream.
[docs]def double_stream(stream): return stream_func( inputs=stream, f_type='element', f=double, num_outputs=1)
# We could also have obtained the desired stream z1 # using stream_func directly, # z1 = stream_func( # inputs=x, f_type='element', f=double, num_outputs=1) # EXAMPLE 3 # Example of function composition. # Generate a stream w1 that doubles the squares of the elements # of stream x # w1 = double_stream(square_stream(x)) # If x is a stream [1, 3, 5, ...] then # w1 is a stream [2, 18, 50, ...] # # Generate a stream w2 that squares twice the elements # of stream x # w2 = square_stream(double_stream(x)) # If x is a stream [1, 3, 5, ...] then # w2 is a stream [4, 36, 100, ...] # EXAMPLE 4 # Illustrating use of _no_value. # SPECIFICATION: # Write a function, discard_odds, that has a single # input stream and that returns a single stream whose elements # are the same as its input stream except that odd # numbers are discarded. # # If v = discard_odds(x) # and x is a stream [1, 2, 3, 4, 5, 6, ...] then # v must be the stream [2, 4, 6,.....] # # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function with a single input that returns # its single input value if the input value is even # and returns _no_value otherwise.
[docs]def even(v): if not v%2: return v else: return _no_value
# # _no_value is a special object that is not placed # in an output stream. # # Second step: # Wrap the above function, even, to obtain # the desired function discard_odds.
[docs]def discard_odds(stream): return stream_func( inputs=stream, f_type='element', f=even, num_outputs=1)
# Explanation of need for _no_value # # Consider the following example that returns None rather than # _no_value for odd numbers.
[docs]def even_1(v): if not v%2: return v else: return None
[docs]def discard_odds_1(stream): return stream_func( inputs=stream, f_type='element', f=even_1, num_outputs=1)
# # If v_1 = discard_odds_1(x) and # x is a stream [1, 2, 3, 4, ...] then # v_1 is a stream[None, 2 None, 4, ...] which is not the # same as [2, 4, ...] # EXAMPLE 5 # Illustrating use of _multivalue. # If Python function f returns _multivalue(l) # where l is a list then the agent appends each # element of l to the agent's output stream. # For example, if f returns _multivalue([3, 4]) # then 3 and then 4 will be appended to the # agent's output stream. # Note that if f returns [3, 4] then the list # [3, 4] will be appended to the agent's output # stream; this is not the same as appending 3 and # then 4. # SPECIFICATION: # Write a function evens_and_halves with a single # input stream that returns a single stream in # which odd values in the input stream are discarded # and even values and half their values appear in # the output stream. # # If u = evens_and_halves(x) and # x is a stream [1, 2, 3, 4, 5, 6, ..] then # u must be the stream [2, 1, 4, 2, 6, 3, .....] # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function with a single input v and that # returns _multivalue([v, v/2] if v is even # and returns _no_value otherwise.
[docs]def even2(v): if not v%2: return _multivalue([v, v/2]) else: return _no_value
# Second step: # Wrap the above function, even2, to obtain # the desired function evens_and_halves.
[docs]def evens_and_halves(stream): return stream_func( inputs=stream, f_type='element', f=even2, num_outputs=1)
# Illustration of the need for _multivalue # As a contrast to even2 consider the following:
[docs]def even3(v): if not v%2: return [v, v/2] else: return _no_value
[docs]def evens_and_halves_3(stream): return stream_func( inputs=stream, f_type='element', f=even3, num_outputs=1)
# If t = evens_and_halves_3(x) # and x is a stream [1, 2, 3, 4, 5, 6, ..] then # t is the stream [[2, 1], [4, 2], [6, 3], .....] which is # different from [2, 1, 4, 2, 6, 3, ...] # EXAMPLE 6 # Illustrating use of local variables. # SPECIFICATION: # Write a function multiply_elements_in_stream # with two input parameters: # (1) stream: a stream of numbers and # (2) multiplier: a number # The function returns a single stream whose # elements are multiplier times the corresponding # elements of the input stream. # If s = multiply_elements_in_stream(stream=x, multiplier=3) # and x is a stream [1, 2, 3, 4, ...] then # s must be the stream [3, 6, 9, 12,...] # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function mult(v) that returns multiplier times # v, where multiplier is a constant specified outside # mult. # def mult(v): return multiplier*v # Second step: # Wrap the function, mult, to obtain # the desired function multiply_elements_in_stream. # Define mult inside the definition of # multiply_elements_in_stream so that the parameter # multiplier is available to function mult.
[docs]def multiply_elements_in_stream(stream, multiplier): def mult(v): return multiplier*v return stream_func( inputs=stream, f_type='element', f=mult, num_outputs=1)
# EXAMPLE 7 # Another example illustrating use of local variables. # SPECIFICATION: # Write a function boolean_of_values_greater_than_threshold # with two input parameters: # (1) stream: a stream of numbers and # (2) threshold: a number # The function returns a single stream whose # elements are True if the corresponding # elements of the input stream exceed threshold and # are False otherwise. # If # r = boolean_of_values_greater_than_threshold(stream=x, threshold=4) # and x is a stream [1, 20, 31, 4, ...] then # r must be the stream [False, True, True, False,...] # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function value_greater_than_threshold(value) # that returns True if value exceed threshold # where threshold is a constant specified outside # value_greater_than_threshold. # def value_greater_than_threshold(value): # return value > threshold # Second step: # Wrap the function, value_greater_than_threshold, to # obtain the desired function, # boolean_of_values_greater_than_threshold. # Define value_greater_than_threshold inside the definition # of boolean_of_values_greater_than_threshold so that the # parameter threshold is available to function # boolean_of_values_greater_than_threshold.
[docs]def boolean_of_values_greater_than_threshold(stream, threshold): def value_greater_than_threshold(value): return value > threshold return stream_func( inputs=stream, f_type='element', f=value_greater_than_threshold, num_outputs=1)
#______________________________________________________ # PART 1B #______________________________________________________ # Single input, single output, stateful functions # Inputs to the functions: # element : object # element of the input stream # state : state of the agent before the transition # Returned by the functions: # element : object # element to be placed on the output stream. # state : object # The next state of the agent. # # The form of the wrapper in these examples is: # stream_func( # inputs=stream, # The name of the input stream # f_type='element', # f=g, # The name of the function that is wrapped # num_outputs=1, # The number of outputs # state=initial_state # specifies the initial state # ) #______________________________________________________ # EXAMPLE 1 # An example illustrating state. # SPECIFICATION: # Write a function cumulative_stream with a single # input stream and that returns a single stream where # the j-th element of the output stream is the sum # of the first j elements of its input stream. # If b = cumulative_stream(stream=x) # and x is a stream [1, 2, 3, 4, ...] then # b must be the stream [1, 3, 6, 10, ....] # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function cumulative_sum given below.
[docs]def cumulative_sum(v, cumulative): """ Parameters ---------- v : number The next element of the input stream of the agent. cumulative: number The state of the agent. The state is the sum of all the values received on the agent's input stream. Returns ------- (cumulative, cumulative) cumulative : number The state after the transition, i.e., the sum of values received on the agent's input stream including the value received in this transition. """ cumulative += v return (cumulative, cumulative)
# Second step: # Wrap the function, cumulative_sum, to # obtain the desired function, cumulative_stream. # Since the function has state, the wrapper specifies # the initial state: state=0.
[docs]def cumulative_stream(stream): return stream_func( inputs=stream, f_type='element', f=cumulative_sum, num_outputs=1, state=0 # The initial state )
# EXAMPLE 2 # Another example illustrating state. # SPECIFICATION: # Write a function average_stream that has a # single input stream and returns a stream where # the j-th element of the output stream is the average # of the first j elements of its input stream. # If c = cumulative_stream(stream=x) # and x is a stream [1, 2, 3, 4, ...] then # c must be the stream [1.0, 1.5, 2.0, 2.5, ....] # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function average given below.
[docs]def average(v, state): """ Parameters ---------- v : number The next element of the input stream of the agent. state: (n, cumulative) The state of the agent where n : number The value of the next element in the agent's input stream. cumulative : number The sum of the values that the agent has received on its input stream. Returns ------- (mean, state) mean : floating point number The average of the values received so far by the agent state : (n, cumulative) The new state of the agent. """ n, cumulative = state n += 1 cumulative += v mean = cumulative/float(n) state = (n, cumulative) return (mean, state)
# Second step: # Wrap the function, average, to # obtain the desired function, average_stream. # Since this function has a state, the wrapper # specifies the initial state: state=(0,0.0).
[docs]def average_stream(stream): return stream_func( inputs=stream, f_type='element', f=average, num_outputs=1, state=(0, 0.0) # The initial state # Initially n = 0, cumulative = 0.0 )
####################################################### # PART 2: SINKS # ONE OR MORE INPUT STREAM, NO OUTPUT STREAMS. ####################################################### #______________________________________________________ # PART 2A: Stateless # Single input, no output, stateless functions # Inputs to the functions: # element : object # element of the input stream # Returned by the functions: # None # # The form of the wrapper in these examples is: # stream_func( # inputs=stream, # The name of the input stream # f_type='element', # f=g, # The name of the function that is wrapped # num_outputs=0 # The number of outputs # ) #______________________________________________________ # EXAMPLE 1 # SPECIFICATION: # Write a function print0 that has a single input # stream and that returns None. The function # prints the values of its input stream. # If the input stream has initial values [3, 5, ...] # and the stream name is 'x' then the function should # print: # x : 3 # x : 5 # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function p0 with a single parameter, v, where # p0 prints the name of a stream (specified outside p0) # and the value of v. # def p0(v): # print '{0} : {1}'.format(stream.name, v) # Second step: # Wrap the function, p0, to obtain the desired function, # print0. Define p0 inside the definition # of print0 so that p0 can access the parameter, # stream of print0. # This function doesn't have to keep track of its past, # and so it has no state. Therefore, the wrapper # stream_func does not specify a state.
[docs]def print0(stream): def p0(v): print '{0} : {1}'.format(stream.name, v) return stream_func( inputs=stream, f_type='element', f=p0, num_outputs=0)
#______________________________________________________ # PART 2A: Stateful # Single input, no output, with state # EXAMPLE 2 # Illustrates state. # SPECIFICATION: # Write a function print_stream that has a single input # stream and that returns None. The function # prints the values with the indexes of its input stream. # If the input stream has initial values [3, 5, ...] # and the stream name is 'x' then the function should # print: # x[0] = 3 # x[1] = 5 # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # The function has to keep track of the number of values # that have already been printed; so we need a state. # Write a function print_element with two parameters, v, # the value in a stream, and count --- the number of elements # printed so far. The function returns the new state, i.e., # the new value of count. The function uses a variable stream # specified outside the function. #def print_element(v, count): # print '{0}[{1}] = {2}'.format(stream.name, count, v) # return (count+1) # Second step: # Wrap the function, print_element, to obtain the desired function, # print_stream. Define print_element inside the definition # of print_stream so that print_element can access the parameter, # stream of print_stream. # This function has a state, namely count, and so the wrapper # specifies its initial value: 'state=0'. # EXAMPLE 3 # SPECIFICATION: # Write a function stream_to_file that has two # parameters, a single input stream and a filename. # The function returns None. The function # appends the json representations of the values # of its input stream into the file with name filename. # If the input stream has initial values [3, 5, ...] # and file is initially empty then the file should # have the json representations of 3 and 5, each on a # separate line. # (Note that if the file is not initially empty, then # the stream values are appended to the end of the # nonempty file.) # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function write_value_to_file(v) that # appends the json representation of v to the # file with name filename with each append on # a new line. # Second step: # Wrap the function, write_value_to_file, to # obtain the desired function, stream_to_file. # Define write_value_to_file inside the definition # of stream_to_file so that stream_to_file can # access the parameter, filename.
[docs]def stream_to_file(stream, filename): def write_value_to_file(v): with open(filename, 'a') as input_file: input_file.write(json.dumps(v) + '\n') return stream_func( inputs=stream, f_type='element', f=write_value_to_file, num_outputs=0)
# EXAMPLE 4 # Illustrates synchronous merge of multiple inputs # in a sink (i.e., no output). # SPECIFICATION: # Write a function, print_sums, with a single parameter # which is a list of streams. The function returns None. # The function prints the sum of the k-th elements of # all its input streams for all k. # EXAMPLE 5 # For an illustration of a sink with asynchronous merge # of its inputs see Section 7. ####################################################### # PART 3: SOURCES # NO INPUT STREAMS, ONE OR MORE OUTPUT STREAMS ####################################################### #______________________________________________________ # PART 3A: Stateless # No inputs, one or more outputs, stateless functions # Illustrates the use of call_streams and illustrates # that append and extend of a stream are analogous to # the same operations on lists. #______________________________________________________ # EXAMPLE 1 # SPECIFICATION: # Write a function, timer, with three parameters: # output_stream, num_outputs and time_period where # stream is a Stream, num_outputs is a positive integer # and time_period is a positive number. # # The function generates a stream consisting # of the values [0, 1,..., num_outputs-1]. An integer # is output to the stream every time_period seconds. # THE STREAMING PROGRAM. # Illustrates that stream.append(i) is analogous # to l.append(i) where stream is a Stream and l # is a list. import time
[docs]def timer(output_stream, num_outputs, time_period): """ Parameters ---------- stream: Stream num_outputs: int, positive time_period: int or float, positive """ for i in range(num_outputs): output_stream.append(i) time.sleep(time_period)
# EXAMPLE 2 # SPECIFICATION: # Write a function, rand, with three parameters: # output_stream, num_outputs, time_period. # where output_stream is a stream, num_outputs is # a nonnegative number and time_period is an # optional positive number. # The function generates a stream of # num_outputs random numbers. If time_period # is provided, a random number is appended # to the stream periodically with the period # time_period. If time_period is not provided # random numbers are appended to the stream # continuously. # THE STREAMING PROGRAM. import time import random
[docs]def rand(output_stream, num_outputs, time_period=0): """ Parameters ---------- output_stream: Stream num_outputs: int, positive time_period: int or float, positive """ if not time_period: for _ in range(num_outputs): output_stream.append(random.random()) else: for _ in range(num_outputs): output_stream.append(random.random()) time.sleep(time_period)
# EXAMPLE 3 # SPECIFICATION: # Write a function, file_to_stream, with # three parameters: filename, output_stream # and time_period (optional) # The function reads a file called filename. # The file has json representations of objects, # with one or more representations per line. # The function appends the objects in the file, # to the stream. Objects from one line of the file # are appended the stream every time_period seconds # if time_period is specified. If time_period # is not specified, the function appends objects # from the file to the stream continuously. # THE STREAMING PROGRAM. import time
[docs]def file_to_stream(filename, output_stream, time_period=0): """ Parameters ---------- filename: str output_stream: Stream time_period: int or float, nonnegative """ with open(filename, 'r') as output_file: for line in output_file: values = [json.loads(v) for v in line.split()] output_stream.extend(values) if time_period: time.sleep(time_period)
# EXAMPLE 4 # Illustrates the use of call_streams # SPECIFICATION: # Write a function, single_stream_of_random_numbers, # that returns a single stream of random numbers. # A random number is appended to the output stream # when the parameter timer_stream is modified. # Note that in the wrapper, stream_func, the # parameter call_streams is a LIST of streams, and # so, the correct code is: # call_streams=[timer_stream] # not: # call_streams=timer_stream # THE STREAMING PROGRAM.
[docs]def single_stream_of_random_numbers(timer_stream): return stream_func( inputs=None, f_type='element', f=random.random, num_outputs=1, call_streams=[timer_stream])
####################################################### # PART 4: SPLIT # SINGLE INPUT STREAM, TWO OR MORE OUTPUT STREAMS. ####################################################### #______________________________________________________ # PART 4A: Stateless # Single input, two or more outputs, stateless functions # A python function with a single input and a tuple of # outputs is wrapped to produce a function with a single # input stream and a list of output streams #______________________________________________________ # EXAMPLE 1 # SPECIFICATION: # Write a function, square_and_double_stream, with a # single parameter: an input stream. The function returns # a list of two streams where the elements of the first # output stream are squares of the elements of the input # stream and the elements of the second output stream are # twice those of the input stream. If the input is the # stream [0, 1, 2, 3, ...] then the function returns a list # of two streams the first of which is [0, 1, 4, 9, ...] # and the second is [0, 2, 4, 6, ..] # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function, square_and_double with a single parameter, # a number. The function returns a tuple of two values, the # square and double of the input.
[docs]def square_and_double(m): return (m*m, 2*m)
# Second step: # Wrap the function, square_and_double, to # obtain the desired function, square_and_double_stream.
[docs]def square_and_double_stream(stream): return stream_func( inputs=stream, f_type='element', f=square_and_double, num_outputs=2 #Two output streams )
# EXAMPLE 2 # SPECIFICATION: # Write a function, exp_mult_div_stream, with four # parameters: stream, exponent, multiplier, and # divisor where the last three parameters are numbers. # The function returns a list of three streams where # the elements of the streams are the elements of the # input stream raised to exponent, multiplied by # multiplier and divided by divisor, respectively. # If the input stream is [0, 1, 2, 3, ...] and exponent # is 3, multiplier is 10, and divisor is 0.25 then the # function returns a list of three streams: # [0, 1, 8, 27, ...], [0, 10, 20, 30, ...], and # [0, 4, 8, 12, ...] # HOW TO DEVELOP THE STREAMING PROGRAM. # Wrap the function (see below) exp_mult_div_number
[docs]def exp_mult_div_stream(stream, exponent, multiplier, divisor): def exp_mult_div_number(n): return [n**exponent, n*multiplier, n/divisor] return stream_func(inputs=stream, f_type='element', f=exp_mult_div_number, num_outputs=3 # Returns list of 3 streams. )
# EXAMPLE 3 # Illustrates use of _no_value # SPECIFICATION: # Write a function, even_odd_stream, with one parameter: stream. # The function returns a list of two streams, the first containing # the even values of the input stream, and the second containing # the odd values. # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function, even_odd with a single parameter, # a number. The function returns a tuple with 2 values, # that will be inserted into the two output streams of the # wrapped function. even_odd returns (_no_value, m) if m # is even, because the odd stream gets no value and the even # stream gets m. Symmetrically, even_odd returns (m, _no_value) # if m is odd.
[docs]def even_odd(m): if m%2: return [_no_value, m] else: return [m, _no_value]
# Second step: # Wrap the function, even_odd, to get the desired function.
[docs]def even_odd_stream(stream): return stream_func(inputs=stream, f_type='element', f=even_odd, num_outputs=2 # Returns list of 2 streams. )
#______________________________________________________ # PART 4B: Stateful # Single input, two or more outputs, stateful functions # Write a python function with two inputs --- a stream element # and a state --- and that returns a tuple of stream # elements and the next state. Wrap the function to produce a # function with a single input stream and a list of output # streams. #______________________________________________________ # EXAMPLE 1 # SPECIFICATION: # Write a function that has a single input stream where the # elements of the stream are tuples: # (sensor name, time, sensor reading) # The sensor name is either 'temperature' or 'humidity' # The time is a positive integer. The sensor reading is # a number where the temperature reading is greater than -274, # and the humidity reading is non-negative. # The function returns two output streams, one for temperature # and one for humidity. If the temperature input stream has # a value that is less than DELTA away from its previous output, # where DELTA is a constant parameter, then that value is not # placed on the output stream; if the value exceeds DELTA then # it is placed on the output stream. Similarly for humidity data. # The parameters of the function are the input stream and DELTA. # The function returns a list of two output streams, one for # temperature and the other for humidity. # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # The state of the agent is the last value output on each of # the temperature and humidity streams. The state is represented # by a tuple of 2 numbers. # Write a function t_and_h with two parameters: # (1) msg: a 3-tuple (sensor_name, time, reading), and # (2) a state (last_temperature, last_humidity) # The function returns: # (1) A 2-tuple representing the next outputs on the temperature # and humidity streams, and # (2) the next state. # The function reads a constant DELTA defined outside the function. # Second step: # Wrap the function, t_and_h, to get the desired function. # Set the initial state to be a 2-tuple.
[docs]def temperature_and_humidity_streams(stream, DELTA): def t_and_h(msg, state): sensor_name, time, reading = msg index = 0 if sensor_name == 'temperature' else 1 next_output = [_no_value, _no_value] next_state = state if abs(state[index] - reading) > DELTA: next_output[index] = msg state[index] = reading return (next_output, next_state) return stream_func(inputs=stream, f_type='element', f=t_and_h, num_outputs=2, # Returns list of 2 streams. state= [-274, -1] # Initial state )
####################################################### # PART 5: MERGE # TWO OR MORE INPUT STREAMS, SINGLE OUTPUT STREAM. # ####################################################### #______________________________________________________ # PART 5A: Merge Stateless # Two or more inputs, single output, stateless functions. # A python function with a single input which is a list # and a single return value is wrapped to produce a # function with a list of input streams and a single output # stream. # # This merge is synchronous. The agent waits to receive # the j-th message in each input stream and then carries out # a computation on the list of j-th messages, one message per # input stream. The agent only outputs m messages where m # is the minimum number of messages in each of its input # streams. # # For asynchronous merges see asynch_element. #______________________________________________________ # EXAMPLE 1 # SPECIFICATION: # Write a function, mean_stream that has a single parameter, # a list of input streams. It returns a stream where the j-th # value of the stream is the mean of the j-th values of # its input streams. If the values of streams x, y, z # are [3, 5, 8], [1, 7, 2], and [2, 3] and the input to # the function is [x, y, z] then the output at this point # is [2.0, 5.0], i.e. (3+1+2)/3.0 and (1+7+3)/3.0 # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function that takes a list of numbers as input # and that returns its mean. np.mean() is such a function. # Second step: # Wrap the function np.mean() to get the desired function # mean_stream.
[docs]def mean_stream(list_of_streams): return stream_func(inputs=list_of_streams, f_type='element', f=np.mean, num_outputs=1)
#______________________________________________________ # PART 5B: Merge Stateful # Two or more inputs, single output, stateful functions. # A python function with two parameters: (1) a list # and (2) a state, and that returns two values: (1) a # single element of a stream and (2) a new state, is # wrapped to obtain a function that has as input a # list of streams and outputs a single stream. # # EXAMPLE 1 # SPECIFICATION: # Write a function, max_stream that has a single parameter, # a list of input streams, and that outputs a single stream. # The elements of all the input streams are nonnegative # numbers. # The elements of the output stream are 3-tuples: # (0) max value seen so far in all the inputs, # (1) index of the list in which max value appears, and # (3) the time, i.e., the count, at which the max value appeared. # For example, if the list of input streams is [x, y] where # x = [10, 9, 3, 0, 7, 8, 10] and y = [8, 8, 4, 9, 12, 2, 3] # then the output stream would be [(10, 0, 0), (12, 1, 4)] because # x[0]=10 and y[4]=12 are the max values seen, and x is the # zeroth element of the input list, while y is the first element # of the input list. # First step: # The state of the computation is a 2-tuple: # (0) the previous max value, # (1) the current time (count). # Write a function, max_with_index, # with two parameters: a list of numbers and a state. # The function returns a 2-tuple: (msg, next_state). # At each step, the current time is incremented by 1. # msg is _no_value to indicate no message or is the # 3-tuple (current max, current max index, current time).
[docs]def max_with_index(list_of_numbers, state): previous_max, current_time = state current_max = max(list_of_numbers) current_time += 1 if previous_max >= current_max: msg = _no_value state = (previous_max, current_time) else: current_max_index = list_of_numbers.index(current_max) msg = (current_max, current_max_index, current_time) state = (current_max, current_time) return (msg, state)
# Second step: # Wrap the function, max_with_index, to get the desired function # max_stream.
[docs]def max_stream(list_of_streams): return stream_func(inputs=list_of_streams, f_type='element', f=max_with_index, num_outputs=1, state=(-1, -1) # Initial (max, time) )
####################################################### # PART 6: MANY TO MANY # TWO OR MORE INPUT STREAMS, TWO OR MORE OUTPUT STREAMS. # ####################################################### #______________________________________________________ # PART 6A: Many to Many Stateless # A python function with a single parameter, a list # of stream elements and that returns a single value: a # list of stream elements is wrapped to obtain a function # that inputs a list of streams and that ouputs a list # of streams # # EXAMPLE 1 # SPECIFICATION: # Write a function, inrange_and_outlier_streams, with the # following parameters: a list of two streams and constants # A, B, DELTA. We call the two input streams x_stream and # y_stream. The elements of the input streams are numbers. # The function returns a list of two streams that we call # inrange_stream and outlier_stream. A pair of inputs # (x, y) from the input streams x_stream, y_stream is placed # in inrange_stream if abs(A*x+B - y) <= DELTA, and # in outlier_stream otherwise. # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function, inrange_and_outlier_values, with a # single parameter, a list of two values, where the values # are elements of streams. The function uses constants # A, B, DELTA which are defined outside the function. # The function returns a list of two values which will # be output on inrange_stream and outlier_stream. The # function returns _no_value to indicate that no value # is output. # Second step: # Wrap the function, inrange_and_outlier_values, to get # the function, inrange_and_outlier_streams.
[docs]def inrange_and_outlier_streams( x_and_y_streams, A, B, DELTA): def in_range_and_outlier_values(x_and_y): x, y = x_and_y if abs(A*x+B-y) > DELTA: return ([_no_value, x_and_y]) else: return ([x_and_y, _no_value]) return stream_func( inputs=x_and_y_streams, f_type='element', f=in_range_and_outlier_values, num_outputs=2)
#______________________________________________________ # PART 6B: Many to Many Stateful # Start with a python function with two parameters, a list # of stream elements and a state; the function returns a # list of stream elements and a new state. We wrap this # function to obtain a function that takes a list of # streams as input and that ouputs a list of streams. # # EXAMPLE 1 # SPECIFICATION: # Write a function similar to that of Section 5B # (Merge, Stateful), example 1. Write max_and_min # which takes a list of streams as input and outputs # two streams. One of the output streams contains the # max value seen so far, and the time at which the # this max value was received and the other contains the # same for the min value. # The elements of the input streams are nonnegative # numbers less than 10000. # See Section 5B, example 1. A difference with the # example in 5B is that the output streams contain # the names of streams rather than their indexes. # The wrapped function, max_and_min_with_names, # must have access to the stream names. So the # wrapped function is defined inside the stream # function, max_and_min_stream.
[docs]def max_and_min_with_names(list_of_numbers, state): previous_max, previous_min, current_time = state current_max = max(list_of_numbers) current_min = min(list_of_numbers) current_time += 1 if previous_max >= current_max: msg_max = _no_value else: max_index = list_of_numbers.index(current_max) max_stream_name = list_of_streams[max_index].name msg_max = (current_max, max_stream_name, current_time) previous_max = current_max if previous_min <= current_min: msg_min = _no_value else: min_index = list_of_numbers.index(current_min) min_stream_name = list_of_streams[min_index].name msg_min = (current_min, min_stream_name, current_time) previous_min = current_min state = (previous_max, previous_min, current_time) msgs_max_and_min = [msg_max, msg_min] return (msgs_max_and_min, state)
# Second step: # Wrap the function, max_with_index, to get the desired function # max_stream.
[docs]def max_and_min_stream(list_of_streams): def max_and_min_with_names(list_of_numbers, state): previous_max, previous_min, current_time = state current_max = max(list_of_numbers) current_min = min(list_of_numbers) current_time += 1 if previous_max >= current_max: msg_max = _no_value else: max_index = list_of_numbers.index(current_max) max_stream_name = list_of_streams[max_index].name msg_max = (current_max, max_stream_name, current_time) previous_max = current_max if previous_min <= current_min: msg_min = _no_value else: min_index = list_of_numbers.index(current_min) min_stream_name = list_of_streams[min_index].name msg_min = (current_min, min_stream_name, current_time) previous_min = current_min state = (previous_max, previous_min, current_time) msgs_max_and_min = [msg_max, msg_min] return (msgs_max_and_min, state) # Finished def max_and_min_with_names # Wrapper return stream_func(inputs=list_of_streams, f_type='element', f=max_and_min_with_names, num_outputs=2, state=(-1, 10000, -1) # Initial (max, min, time) )
####################################################### # PART 7: ASYNCHRONOUS MERGES # TWO OR MORE INPUT STREAMS, OPERATE ON ONE STREAN AT A TIME. # ####################################################### # Asynchronous merges operate on an element from one # of the input streams at a time. By contrast, a regular # merge waits until an element appears on each of its # input streams. If a regular merge has two input streams # and one has m elements and the other has n elements, then # the merge can only operate on min(m,n) elements because it # operates on the same number of elements on each of its input # streams. By contrast, an asynchronous merge could have m+n # outputs. # When a new element appears in any input stream that # element is processed. The function that is wrapped has # a single parameter: msg_content_and_stream_index_tuple # message_content, stream_index = msg_content_and_stream_index_tuple # where stream_index is a pointer to the input stream in which # the latest message appeared, and message_content is this latest # element. #______________________________________________________ # EXAMPLE 1 # PART 7 Example 1 Asynchronous merge. # Example with no output and no state # SPECIFICATION # Write a function, print_streams, that has a single input # a list of streams. The function returns None. The function # prints the elements that arrive on any of its input streams. # The function prints elements on any ONE stream in order; # however, the order in which elements from DIFFERENT streams # are printed is left unspecified. # HOW TO DEVELOP THE STREAMING PROGRAM. # First step: # Write a function, print_stream_asynch, with a # single parameter, msg_content_and_stream_index_tuple, where # message_content, stream_index = msg_content_and_stream_index_tuple. # This function returns None. # Second step: # Wrap the funtion, print_stream_asynch, to get the desired # function on streams. # EXAMPLE 2 # Example of asynchronous merge; single output, stateless # SPECIFICATION # Write a function, join_streams, which outputs # the elements arriving on its input streams as # they arrive.
[docs]def join_streams(list_of_streams): list_of_stream_names =[stream.name for stream in list_of_streams] def output_asynch(msg_content_and_stream_index_tuple): message_content, stream_index = msg_content_and_stream_index_tuple stream_name = list_of_stream_names[stream_index] return ((stream_name, message_content)) return stream_func( inputs=list_of_streams, f_type='asynch_element', f=output_asynch, num_outputs=1)
# EXAMPLE 3 # Example of asynchronous merge; single output, stateful # SPECIFICATION # Write a function, max_seen_across_all_streams, which outputs # the maximum of all the elements arriving on all its input # streams seen so far. The elements of the input streams are # nonnegative.
[docs]def max_of_all_inputs(list_of_elements, previous_max): current_max = max(max(list_of_elements), previous_max) msg = current_max state = current_max return (msg, state)
[docs]def max_seen_across_all_streams(list_of_streams): return stream_func( inputs=list_of_streams, f_type='asynch_element', f=max_of_all_inputs, num_outputs=1, state = -1 )
# EXAMPLE 4 # Example of asynchronous merge # multiple inputs, multiple outputs, stateful # SPECIFICATION # Write a function, max_and_min_seen_across_all_streams, which # has a single parameter, a list of streams, and which returns # a tuple of two streams. The first output stream contains # the maximum of all the elements that have arrived on all its input # streams seen so far. The elements of the input streams are # nonnegative. The second output stream contains the minimum of # all the elements that have arrived on all its input streams so far. # Values are placed on the output streams whenever the max and min # values change due to an arrival on any input stream. By contrast # (synchronous) merge waits to get k arrivals on each input stream # before making the k-th step of the computation.
[docs]def max_and_min_seen_across_all_streams(list_of_streams): def max_and_min_of_all_inputs( msg_content_and_stream_index_tuple, state): message_content, stream_index = msg_content_and_stream_index_tuple previous_max, previous_min = state current_max = max(message_content, previous_max) current_min = min(message_content, previous_min) next_msg = [_no_value, _no_value] if current_max > previous_max: next_msg[0] = current_max if current_min < previous_min: next_msg[1] = current_min next_state = (current_max, current_min) return (next_msg, next_state) return stream_func( inputs=list_of_streams, f_type='asynch_element', f=max_and_min_of_all_inputs, num_outputs=2, state = (-1, 1000) )
####################################################### # PART 8: CREATING AGENTS RATHER THAN STREAMS # ####################################################### # The only difference in this section is that the # input AND output streams are declared before calling # the wrapper that creates an agent. # Consider Section 1, example 1A, create a stream # that is the square of an input stream. The wrapper code # was: ## y_a = stream_func( ## inputs=stream, f_type='element', f=square, num_outputs=1) # The wrapper, stream_func, returned a stream, and a parameter # of the wrapper was num_outputs. # You can also define the stream y_a first, and call a # different wrapper called stream_agent, and pass y_a as an # argument to steam_agent, as in: ## y_a = Stream('squares of x') ## print_stream(y_a) # Now create the agent that populates y_a. ## stream_agent(inputs=x, outputs= y_a, f_type='element', f=square) # The difference between the two wrappers is that num_ouputs is not # specified in stream_agent; instead you pass a stream or a list of # streams for the parameter 'outputs'. ##################################################### ##################################################### ##################################################### #####################################################
[docs]def main(): # Illustration of timer: Part 3. Example 1 # and print_stream: Part 2a. Example 2 # Create a stream x and call it # 'natural numbers' x = Stream('natural numbers') # Create an agent that prints the stream print_stream(x) # call function timer to populate the stream timer( output_stream=x, num_outputs=5, time_period=0.1) ################################################ # PART 1a ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF SINGLE INPUT, SINGLE OUTPUT, STATELESS' print '**************************************************' print 'Part 1a. Example 1' # Illustration of square_stream # Create a stream y whose elements are the squares # of the elements of x. y = square_stream(x) # Give the stream a name. # A name helps in reading the output. y.set_name('Squares of x') print_stream(y) ################################################ print print 'Part 1a. Example 2' # Illustration of double_stream. Part 1a. Example 2 # Create a stream z whose elements are twice the # elements of x z = double_stream(x) # Give the stream a name and print it. z.set_name('Doubles of x') print_stream(z) ################################################ print print 'Part 1a. Example 3' w1 = double_stream(square_stream(x)) w2 = square_stream(double_stream(x)) w1.set_name('Doubles of squares of x') w2.set_name('Squares of doubles of x') print_stream(w1) print_stream(w2) ################################################ print print 'Part 1a. Example 4' v = discard_odds(x) v.set_name('Even numbers in x') print_stream(v) v1 = discard_odds_1(x) v1.set_name('Even numbers or None in x') print_stream(v1) ################################################ print print 'Part 1a. Example 5' u = evens_and_halves(x) u.set_name('Evens and halves of evens in x') print_stream(u) u3 = evens_and_halves_3(x) u3.set_name('Tuples of evens and halves of evens in x') print_stream(u3) ################################################ print print 'Part 1a. Example 6' s = multiply_elements_in_stream(stream=x, multiplier=3) s.set_name('Three times x') print_stream(s) ################################################ print print 'Part 1a. Example 7' r = boolean_of_values_greater_than_threshold( stream=x, threshold=2) r.set_name('Indicator of values above 2 in x') print_stream(r) ################################################ # PART 1B ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF SINGLE INPUT, SINGLE OUTPUT STATEFUL' print '**************************************************' print 'Part 1b. Example 1' q = cumulative_stream(x) q.set_name('Cumulative sum of x') print_stream(q) ################################################ print print 'Part 1b. Example 2' o = average_stream(x) o.set_name('Average of x') print_stream(o) ################################################ # PART 2: SINKS ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF SINKS. NO OUTPUTS' print '**************************************************' print 'Part 2. Example 1' print0(x) ################################################ print print 'Part 2. Example 2' print_stream(x) ################################################ print print 'Part 2. Example 3' stream_to_file(x, 'temp') ################################################ print print 'Part 2. Example 4' alpha = Stream() beta = Stream() print_sums([alpha, beta]) alpha.extend([5, 8, 13, 19, 25, 30]) beta.extend([3, 16, 27, 11]) ################################################ # PART 3: SOURCES ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF SOURCES: NO INPUTS' print '**************************************************' print 'Part 3. Example 2' c = Stream('Random numbers') rand(output_stream=c, num_outputs=5, time_period=0.05) print_stream(c) ################################################ print print 'Part 3. Example 3' b = Stream('stream from file temp') print_stream(b) file_to_stream('temp', b) ################################################ print print 'Part 3. Example 4' a = single_stream_of_random_numbers(x) a.set_name('Stream of random numbers triggered by x') print_stream(a) ################################################ # PART 4A: SPLIT. STATELESS ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF SPLITS, STATELESS' print 'SINGLE INPUT, MULTIPLE OUTPUTS' print '**************************************************' print 'Part 4. Example 1' sqr, dbl = square_and_double_stream(x) sqr.set_name('square of x') dbl.set_name('twice x') print_streams([sqr, dbl]) ################################################ print print 'Part 4. Example 2' exp, mul, div = \ exp_mult_div_stream( stream=x, exponent=3, multiplier=10, divisor=0.25) exp.set_name('raise to 3rd power of x') mul.set_name('multiply 10 times x') div.set_name('divide by 0.25 of x') print_stream(exp) print_stream(mul) print_stream(div) print_streams([exp, mul, div]) ################################################ print print 'Part 4. Example 3' evens, odds = even_odd_stream(x) evens.set_name('even values of x') odds.set_name('odd values of x') print_streams([evens, odds]) ################################################ # PART 4B: SPLIT. STATEFUL ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF SPLIT, STATEFUL' print 'SINGLE INPUT, MULTIPLE OUTPUTS' print '**************************************************' print 'Part 4B. Example 1' input_temp_humid_stream = Stream('input temperature and humidity') print_stream(input_temp_humid_stream) input_temp_humid_stream.extend([ ('temperature', 2015101501, 60), ('humidity', 2015101501, 30), ('temperature', 2015101502, 61), ('temperature', 2015101503, 62), ('humidity', 2015101503, 31), ('temperature', 2015101504, 63), ('temperature', 2015101505, 65), ('humidity', 2015101505, 32) ]) temperature_stream, humidity_stream = \ temperature_and_humidity_streams(input_temp_humid_stream, DELTA=1) temperature_stream.set_name('changing temperatures') humidity_stream.set_name('changing humidities') print_stream(temperature_stream) print_stream(humidity_stream) ################################################ # PART 5A: MERGE. STATELESS ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF MERGE, STATELESS' print 'MULTIPLE INPUTS, SINGLE OUTPUT' print '**************************************************' print 'Part 5A. Example 1' print 'Input streams are aaa, bbb, and ccc' aaa = Stream('aaa') bbb = Stream('bbb') ccc = Stream('ccc') print_stream(aaa) print_stream(bbb) print_stream(ccc) ddd = mean_stream([aaa, bbb, ccc]) ddd.set_name('mean of aaa, bbb, ccc') print_stream(ddd) aaa.extend([30, 25, 50, 6, 10]) bbb.extend([10, 15, 70, 8, 6]) ccc.extend([20, 21, 30, 4]) ################################################ # PART 5B: MERGE. STATEFUL ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF MERGE, STATEFUL' print 'MULTIPLE INPUTS, SINGLE OUTPUT' print '**************************************************' print 'Part 5B. Example 1' print 'Input streams are aaa, bbb, and ccc' eee = max_stream([aaa, bbb, ccc]) eee.set_name('max of aaa, bb, ccc') print_stream(eee) ################################################ # PART 6: MANY TO MANY. STATELESS ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF MANY-TO-MANY, STATELESS' print 'MULTIPLE INPUTS, MULTIPLE OUTPUTS' print '**************************************************' print 'Part 6A. Example 1' print 'Input streams are xx and yy' print 'A=2, B=1, DELTA=2' xx = Stream('x_stream') yy = Stream('y_stream') print_stream(xx) print_stream(yy) inrange_stream, outlier_stream = \ inrange_and_outlier_streams( x_and_y_streams=[xx,yy], A=2, B=1, DELTA=2) inrange_stream.set_name('inrange') outlier_stream.set_name('outlier') print_stream(inrange_stream) print_stream(outlier_stream) xx.extend([3, 5, 8, 4, 6, 2]) yy.extend([7, 14, 2, 10, 12, 9]) ################################################ # PART 6: MANY TO MANY. STATEFUL ################################################ ################################################ print print '**************************************************' print 'EXAMPLES OF MANY-TO-MANY, STATEFUL' print 'MULTIPLE INPUTS, MULTIPLE OUTPUTS' print '**************************************************' print 'Part 6B. Example 1' max_stream_2, min_stream_2 = max_and_min_stream([aaa, bbb, ccc]) max_stream_2.set_name('max from max_and_min of aaa, bbb, ccc') min_stream_2.set_name('min from max_and_min of aaa, bbb, ccc') print_stream(max_stream_2) print_stream(min_stream_2) ################################################ print print '**************************************************' print 'EXAMPLES OF ASYNCHRONOUS MERGE' print 'MULTIPLE INPUTS, SINGLE OUTPUT' print '**************************************************' print 'Part 7. Example 2' print 'Input streams are xxx and yyy' xxx = Stream('xxx') yyy = Stream('yyy') print_stream(xxx) print_stream(yyy) join_of_xxx_and_yyy_streams = join_streams([xxx,yyy]) join_of_xxx_and_yyy_streams.set_name('Join of xxx and yyy streams') print_stream(join_of_xxx_and_yyy_streams) N = 3 for i in range(N): xxx.append(random.randint(0,10)) yyy.append(random.randint(10,20)) print print '**************************************************' print 'EXAMPLES OF ASYNCHRONOUS MERGE' print 'MULTIPLE INPUTS, SINGLE OUTPUT WITH STATE' print '**************************************************' print 'Part 7. Example 3' print 'Input streams are aaaa and bbbb' aaaa = Stream('aaaa') bbbb = Stream('bbbb') print_stream(aaaa) print_stream(bbbb) mx_stream = max_seen_across_all_streams([aaaa, bbbb]) mx_stream.set_name('max across all streams') print_stream(mx_stream) N = 3 for i in range(N): aaaa.append(random.randint(100,110)) bbbb.append(random.randint(110,120)) print print '**************************************************' print 'EXAMPLES OF ASYNCHRONOUS MERGE' print 'MULTIPLE INPUTS, MULTIPLE OUTPUTS' print '**************************************************' print 'Part 7. Example 4' print 'Input streams are cccc and dddd' cccc = Stream('cccc') dddd = Stream('dddd') print_stream(cccc) print_stream(dddd) mxx_stream, mnn_stream = max_and_min_seen_across_all_streams([cccc,dddd]) mxx_stream.set_name('max from max and min of all streams') mnn_stream.set_name('min from max and min of all streams') print_streams([mxx_stream, mnn_stream]) cccc.extend([30, 25, 80, 50, 90]) dddd.extend([30, 15, 110, 10, 20, 2]) print print '**************************************************' print '**************************************************' print '**************************************************' print 'EXAMPLES CREATING AGENTS RATHER THAN STREAMS' print '**************************************************' print '**************************************************' print '**************************************************' print print 'Section 1 Example 1 using Agents' # Create a stream y_a = Stream('squares of x') print_stream(y_a) # Create the agent that populates the stream. stream_agent(inputs=x, outputs= y_a, f_type='element', f=square) print print '**************************************************' print 'Section 1 Example 2 using Agents' # Create a stream z_a = Stream('Doubles of x') print_stream(z_a) # Create the agent that populates the stream. stream_agent(inputs=x, outputs= z_a, f_type='element', f=double) print print '**************************************************' print 'Section 1 Example 4 using Agents' # Create a stream vv_a = Stream('Even numbers in x') print_stream(vv_a) # Create the agent that populates the stream. stream_agent(inputs=x, outputs= vv_a, f_type='element', f=even) print print '**************************************************' print 'Section 7 Example 3 using Agents' def max_agent_seen_across_all_streams(list_of_input_streams, output_stream): return stream_agent( inputs=list_of_input_streams, outputs=output_stream, f_type='asynch_element', f=max_of_all_inputs, state = -1 ) mx_a_stream = Stream('max agent stream of aaaa, bbbb') print_stream(mx_a_stream) max_agent = max_agent_seen_across_all_streams([aaaa, bbbb], mx_a_stream) print print '**************************************************' print 'Section 7 Example 4 using Agents' def max_and_min_agent(list_of_input_streams, max_and_min_output_streams): def max_and_min_with_names(list_of_numbers, state): previous_max, previous_min, current_time = state current_max = max(list_of_numbers) current_min = min(list_of_numbers) current_time += 1 if previous_max >= current_max: msg_max = _no_value else: max_index = list_of_numbers.index(current_max) max_stream_name = list_of_input_streams[max_index].name msg_max = (current_max, max_stream_name, current_time) previous_max = current_max if previous_min <= current_min: msg_min = _no_value else: min_index = list_of_numbers.index(current_min) min_stream_name = list_of_input_streams[min_index].name msg_min = (current_min, min_stream_name, current_time) previous_min = current_min state = (previous_max, previous_min, current_time) msgs_max_and_min = [msg_max, msg_min] return (msgs_max_and_min, state) # Finished def max_and_min_with_names # Agent Wrapper return stream_agent(inputs=list_of_input_streams, outputs=max_and_min_output_streams, f_type='element', f=max_and_min_with_names, state=(-1, 10000, -1) # Initial (max, min, time) ) list_of_input_streams = [aaa, bbb, ccc] max_a = Stream('max of aaa, bbb, ccc') min_a = Stream('min of aaa, bbb, ccc') print_stream(max_a) print_stream(min_a) max_and_min_output_streams = [max_a, min_a] max_and_min_agent(list_of_input_streams, max_and_min_output_streams)
if __name__ == '__main__': main()