from Stream import Stream, _no_value, _multivalue, TimeAndValue
from Operators import stream_func, stream_agent
from examples_element_wrapper import print_stream
import numpy as np
import random
############################################################
############################################################
# SECTION 1. SINGLE INPUT, SINGLE OUTPUT, STATELESS
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 1'
print 'EXAMPLES OF SINGLE INPUT, SINGLE OUTPUT, STATELESS'
print '**************************************************'
#______________________________________________________
#
# EXAMPLE 1: SINGLE INPUT, SINGLE OUTPUT, STATELESS
#______________________________________________________
print
print '--------------------------------------------------'
print 'SECTION 1. EXAMPLE 1 '
print ' SINGLE INPUT, SINGLE OUTPUT, STATELESS'
print '--------------------------------------------------'
#
# SPECIFICATION:
# Write a function that sums the values in a time-window
# in a single input stream. The elements of the input stream
# are TimeAndValue objects with a time field, and a value
# field. If x and y are elements in the stream and y follows
# x then y's timestamp is greater than x' timestamp.
# A window of length T time units includes exactly those
# elements in the stream with time stamps in the interval:
# [window_start_time : window_start_time + T].
# The window_start_time moves forward at each step by step_size
# time units; so the sequence of windows are
# [0 : T], [step_size : step_size + T],
# [2*step_size : 2*step_size + T], [3*step_size : 3*step_size + T]
# If window_size=4.0 and step_size=2.0 then the output stream
# will consist of the sum of the values with timestamps in the
# intervals [0:4], [2:6], [4:8], ...
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP:
# Write a function on a timed list.
[docs]def sum_values_in_timed_list(timed_list):
return sum(v.value for v in timed_list)
# a is the input stream for this example
a = Stream('a timed stream')
print_stream(a)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# z is the output stream for this example.
z = stream_func(
inputs=a, # The input is a single stream
f_type='timed', # Identifes 'timed' wrapper
f=sum_values_in_timed_list, # Function that is wrapped.
num_outputs=1, # Single output stream
window_size=4.0,
step_size=2.0)
z.set_name('sum of a')
print_stream(z)
# Drive the input streams.
t=0.0
for _ in range(20):
t += random.random()
v = random.randint(0,9)
a.append(TimeAndValue(t, v))
############################################################
############################################################
# SECTION 2. SINGLE INPUT, SINGLE OUTPUT, STATEFUL
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 2'
print 'EXAMPLES OF SINGLE INPUT, SINGLE OUTPUT, STATEFUL'
print '**************************************************'
#_____________________________________________________________
# EXAMPLE 1
#_____________________________________________________________
# SPECIFICATION:
# Write a function, exponential_smoothed_timed_windows,
# that computes func(window) for each
# timed window, where func is a parameter. The agent
# returns the exponentially smoothed value of func.
# The smoothing factor, alpha, is a parameter.
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP:
# This computation has state to which smoothing is applied
# Write a function, exponential_smoothed_list, with
# parameters: a timed list and state. This function reads
# the parameter alpha of the stream function; so encapsulate
# exponential_smoothed_list within
# exponential_smoothed_timed_windows
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
[docs]def exponential_smoothed_timed_windows(
input_stream, func, alpha,
window_size, step_size,
initial_state):
"""
Parameters
----------
input_stream: Stream
A previously defined stream
This is the only input stream of the agent.
func: function
func operates on a list of TimeAndValue objects
and returns an object that can be smoothed
exponentially.
alpha: number
The exponential smoothing parameter.
window_size, step_size, initial_state:
Already defined.
"""
def exponential_smoothed_list(timed_list, state):
next_state = ((1 - alpha)*func(timed_list) +
alpha*state)
message = next_state
return (message, next_state)
return stream_func(
inputs=input_stream, # single input timed stream
f_type='timed', # identifies 'timed' wrapper
f=exponential_smoothed_list, # function that is wrapped
num_outputs=1, # single output stream
state=initial_state,
window_size=window_size,
step_size=step_size)
print
print '--------------------------------------------------'
print 'SECTION 2. EXAMPLE 1 '
print ' SINGLE INPUT, SINGLE OUTPUT, STATEFUL'
print '--------------------------------------------------'
# b is the input stream for this example
b = Stream('b: timed stream')
print_stream(b)
# y is the output stream for this example.
y = exponential_smoothed_timed_windows(
input_stream=b,
func=sum_values_in_timed_list,
alpha=0.5,
window_size=4,
step_size=2,
initial_state=0)
y.set_name('y')
print_stream(y)
# Drive the input
t=0.0
for _ in range(12):
t += random.random()
v = random.randint(0,9)
b.append(TimeAndValue(t, v))
############################################################
############################################################
# SECTION 3. MULTIPLE INPUTS, SINGLE OUTPUT, STATELESS
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 3'
print 'EXAMPLES OF MULTIPLE INPUTS, SINGLE OUTPUT, STATELESS'
print '**************************************************'
#______________________________________________________
#
# EXAMPLE 1: TWO OR MORE INPUT STREAMS, ONE OUTPUT STREAM
# STATELESS
#______________________________________________________
# SPECIFICATION:
# Write a function that has a single parameter - a list of
# timed streams - and that returns the sum of the values of
# timed windows.
# For example, if the list consists of two timed streams, c
# and d, and:
# c = [(0.1, 100), (0.9, 200), (1.2, 500), (3.1. 800), (6.6, 300)]
# d = [(0.7, 5), (2.3, 25), (3.9, 12), (5.1, 18), (5.2, 12)]
# where for succinctness each pair is (time, value), then
# with a window size and step size of 1.0 the windows are:
# for c: [(0.1, 100), (0.9, 200)], [(1.2, 500)], [], [(3.1. 800)],
# [], []..
# for d: [(0.7, 5)], [], [(2.3, 25)], [(3.9, 12)], [], ...
# Note that we don't yet have the complete windows for the
# interval [5.0, 6.0] for d because we may get later values
# with timestamps less than 6 on stream d.
# The sums for the windows are:
# (100+200+5), (500), (25), (800+12), (),
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP:
# Write a function with a single parameter: a list of timed lists
[docs]def sum_values_in_all_timed_lists(list_of_timed_lists):
return (sum(sum (v.value for v in timed_list)
for timed_list in list_of_timed_lists))
print
print '--------------------------------------------------'
print 'SECTION 3. EXAMPLE 1 '
print ' MULTIPLE INPUTS, SINGLE OUTPUT, STATELESS'
print '--------------------------------------------------'
# Create input streams, c and d, for this example.
c = Stream('Input: c')
d = Stream('Input: d')
print_stream(c)
print_stream(d)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# x is the output stream for this example
x = stream_func(
inputs=[c,d], # list of two input timed streams
f_type='timed', # identifies the 'timed' wrapper
f=sum_values_in_all_timed_lists, #function that is wrapped
num_outputs=1, # Single output stream
window_size=2.0,
step_size=2.0)
x.set_name('Output: x')
print_stream(x)
# Drive the input streams
t_c=0.0
t_d=0.0
for _ in range(12):
t_c += random.random()
t_d += random.random()
v_c = random.randint(0,9)
v_d = 100+random.randint(0,9)
c.append(TimeAndValue(t_c, v_c))
d.append(TimeAndValue(t_d, v_d))
#______________________________________________________
#
# EXAMPLE 2: TWO OR MORE INPUT STREAMS, ONE OUTPUT STREAM
# STATELESS
#______________________________________________________
# SPECIFICATION:
# Write a function that has a two input streams and a
# single output stream. An element on the output stream is
# the difference in lengths of the two windows (one window
# per input stream).
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP
# Write a function on a list of two lists.
[docs]def diff_of_counts_in_lists(list_of_two_lists):
return len(list_of_two_lists[0]) - len(list_of_two_lists[1])
print
print '--------------------------------------------------'
print 'SECTION 3. EXAMPLE 2 '
print ' MULTIPLE INPUTS, SINGLE OUTPUT, STATELESS'
print '--------------------------------------------------'
# Create input streams, cc and dd, for this example.
cc = Stream('cc')
dd = Stream('dd')
print_stream(cc)
print_stream(dd)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# xx is the output stream for this example
xx = stream_func(
inputs = [cc, dd], # Inputs is a list of two streams
f_type = 'timed', # Identifies wrapper as the 'timed' wrapper
f = diff_of_counts_in_lists, # Function that is wrapped
num_outputs=1, # Single output stream.
window_size=2.0,
step_size=2.0)
xx.set_name('xx')
print_stream(xx)
# Drive the input streams
t_cc=0.0
t_dd=0.0
for _ in range(10):
t_cc += random.random()
t_dd += random.random()
v_cc = random.randint(0,9)
v_dd = random.randint(0,9)
cc.append(TimeAndValue(t_cc, v_cc))
dd.append(TimeAndValue(t_dd, v_dd))
############################################################
############################################################
# SECTION 4. MULTIPLE INPUTS, SINGLE OUTPUT, STATEFUL
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 4'
print 'EXAMPLES OF MULTIPLE INPUTS, SINGLE OUTPUT, STATEFUL'
print '**************************************************'
#______________________________________________________
#
# EXAMPLE 1. TWO OR MORE INPUT STREAMS, ONE OUTPUT STREAM
# STATEFUL
#______________________________________________________
#
# SPECIFICATION:
# Write a function with a list of input streams that
# returns a stream in which element is a 2-tuple
# (max_so_far, max_of_current_window) where
# max_of_current_window is the max over all input
# streams of the sums of the values in each timed
# window, and
# max_so_far is the maximum value of max_of_current_window
# over all the windows seen thus far.
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP:
# Write a function, max_sums_timed_windows, with two
# parameters: a list of timed lists, and a state.
# The state is the maximum value seen thus far.
# The function returns a message which is the 2-tuple
# (max_so_far, max_of_current_window), the maximum
# seen so far, and the current maximum, i.e., the
# maximum over all current windows of the sum of the
# window.
[docs]def max_sums_timed_windows(list_of_timed_lists, state):
# The state is the max seen so far.
max_so_far = state
max_of_current_window = \
max(sum(v.value for v in timed_list)
for timed_list in list_of_timed_lists)
# Update the max seen so far.
max_so_far = max(max_so_far, max_of_current_window)
message = (max_so_far, max_of_current_window)
next_state = max_so_far
return (message, next_state)
print
print '--------------------------------------------------'
print 'SECTION 4. EXAMPLE 1 '
print ' MULTIPLE INPUTS, SINGLE OUTPUT, STATEFUL'
print '--------------------------------------------------'
# Create input streams, ee and ff, for this example.
ee = Stream('ee')
ff = Stream('ff')
print_stream(ee)
print_stream(ff)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# w is the output stream of the wrapped function.
w = stream_func(
inputs=[ee, ff], # list of two input timed streams
f_type='timed', # Identifies 'timed' wrapper
f=max_sums_timed_windows, # function being wrapped
num_outputs=1, # Single output stream
state = 0.0, # Initial state
window_size=1.0,
step_size=1.0)
w.set_name('w')
print_stream(w)
# Drive the input streams
t_ee=0.0
t_ff=0.0
for _ in range(8):
t_ee += random.random()
t_ff += random.random()
v_ee = random.randint(0,9)
v_ff = random.randint(0,9)
ee.append(TimeAndValue(t_ee, v_ee))
ff.append(TimeAndValue(t_ff, v_ff))
############################################################
############################################################
# SECTION 5. SINGLE INPUT, MULTIPLE OUTPUT, STATELESS
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 5'
print 'EXAMPLES OF SINGLE INPUT, MULTIPLE OUTPUTS, STATELESS'
print '**************************************************'
#_____________________________________________________________
# EXAMPLE 1: SINGLE INPUT, TWO OR MORE OUTPUTS, STATELESS
#_____________________________________________________________
# SPECIFICATION:
# Write a function that has a single input stream and
# that returns two output streams containing the max
# the min values of windows of the input stream.
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP:
# Write a function, max_sums_timed_windows, with two
# parameters: a list of timed lists
[docs]def max_and_min_of_values_in_timed_list(timed_list):
if timed_list:
return (max(v.value for v in timed_list),
min(v.value for v in timed_list)
)
else:
# timed_list is empty
return (None, None)
print
print '--------------------------------------------------'
print 'SECTION 5. EXAMPLE 1 '
print ' SINGLE INPUT, MULTIPLE OUTPUT, STATELESS'
print '--------------------------------------------------'
# Create input stream, g, for this example.
g = Stream('g')
print_stream(g)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# u, v are the two output streams of the wrapped function.
u, v= stream_func(
inputs=g, # Single input stream
f_type='timed', # Identifies wrapper as 'timed' wrapper.
f=max_and_min_of_values_in_timed_list, # function that is wrapped
num_outputs=2, # Two output streams
window_size=2.0,
step_size=2.0)
u.set_name('u')
v.set_name('v')
print_stream(u)
print_stream(v)
# Drive the input stream.
t_g=0.0
for _ in range(10):
t_g += random.random()
v_g = random.randint(0,9)
g.append(TimeAndValue(t_g, v_g))
############################################################
############################################################
# SECTION 6. SINGLE INPUT, MULTIPLE OUTPUT, STATEFUL
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 6'
print 'EXAMPLES OF SINGLE INPUT, MULTIPLE OUTPUTS, STATEFUL'
print '**************************************************'
#_____________________________________________________________
# SECTION 6 EXAMPLE 1: SINGLE INPUT, TWO OR MORE OUTPUTS, STATEFUL
#_____________________________________________________________
# SPECIFICATION:
# Write a function that has a single input stream and
# that returns two output streams. The elements of the
# output stream are the average of the maximum values
# of the timed windows, where the average is taken
# over all the windows seen so far, and similarly for
# the minimum.
# HOW TO DEVELOP THE STREAMING PROGRAM.
# FIRST STEP:
# Write a function, avg_of_max_and_min_in_timed_list, with two
# parameters: a timed list and a state. The function returns
# a message and a (new) state. The message is a 2-tuple
# (avg_of_max, avg_of_min), where each element of the tuple
# becomes a message in a different output stream. The state
# is (num_windows, sum_of_max, sum_of_min) where
# num_windows is the number of time steps so far for which timed_list
# is non-empty.
# sum_of_max is the sum over all time steps of the max for each step.
# sum_of_min is the sum over all time steps of the min for each step.
[docs]def avg_of_max_and_min_in_timed_list(timed_list, state):
num_windows, sum_of_max, sum_of_min = state
if timed_list:
# timed_list is nonempty
next_max = max(v.value for v in timed_list)
next_min = min(v.value for v in timed_list)
num_windows += 1
sum_of_max += next_max
sum_of_min += next_min
avg_of_max = sum_of_max/float(num_windows)
avg_of_min = sum_of_min/float(num_windows)
state = (num_windows, sum_of_max, sum_of_min)
message = (avg_of_max, avg_of_min)
return (message, state)
else:
# timed_list is empty
# So, don't change the state.
# In particular, don't increment num_windows
avg_of_max = sum_of_max/float(num_windows)
avg_of_min = sum_of_min/float(num_windows)
message = (avg_of_max, avg_of_min)
return (message, state)
print
print '--------------------------------------------------'
print 'SECTION 6. EXAMPLE 1 '
print ' SINGLE INPUT, MULTIPLE OUTPUTS, STATEFUL'
print '--------------------------------------------------'
# Create input stream, h, for this example.
h = Stream('h: Input stream')
print_stream(h)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# s_stream, t_stream are the two output streams of the wrapped function.
s_stream, t_stream = stream_func(
inputs = h, # Input is a single stream.
f_type = 'timed',
f = avg_of_max_and_min_in_timed_list, # Function that is wrapped.
num_outputs=2, # Two output streams
state = (0, 0.0, 0.0), # Initial num windows, sum max, sum min
window_size=2.0,
step_size=2.0)
s_stream.set_name('avg max')
t_stream.set_name('avg min')
print_stream(s_stream)
print_stream(t_stream)
# Drive the input stream.
t_h=0.0
for _ in range(20):
t_h += random.random()
v_h = random.randint(0,9)
h.append(TimeAndValue(t_h, v_h))
############################################################
############################################################
# SECTION 7. MULTIPLE INPUTS, MULTIPLE OUTPUT, STATELESS
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 7'
print 'EXAMPLES OF MULTIPLE INPUTS, MULTIPLE OUTPUTS, STATELESS'
print '**************************************************'
#_____________________________________________________________
# SECTION 7 EXAMPLE 1: MULTIPLE INPUTS, MULTIPLE OUTPUTS, STATELESS
#_____________________________________________________________
# SPECIFICATION:
# Write a function that has a single parameter, a list of timed
# streams. The function returns a list of two (untimed) streams.
# The k-th element of the first output stream is the maximum
# value across all input streams of the k-th timed window, and
# the corresponding element for the second output stream is the
# minimum value. If the k-th timed windows for all the input
# streams are empty, the k-th element of the output streams are
# both None.
# FIRST STEP:
# Write a function that has a single parameter: a list of timed lists.
# The function returns a 2-tuple: the max and the min of the values
# across all the timed lists if at least one timed list is nonempty,
# and None otherwise.
[docs]def max_and_min_values_in_all_timed_lists(list_of_timed_lists):
if any(list_of_timed_lists):
return (max(max(v.value for v in timed_list)
for timed_list in list_of_timed_lists if timed_list),
min(min(v.value for v in timed_list)
for timed_list in list_of_timed_lists if timed_list)
)
else:
return (None, None)
print
print '--------------------------------------------------'
print 'SECTION 7. EXAMPLE 1 '
print ' MULTIPLE INPUTS, MULTIPLE OUTPUTS, STATELESS'
print '--------------------------------------------------'
# Create inputs stream, i_stream and j_stream, for this example.
i_stream = Stream('i_stream: Input stream')
j_stream = Stream('j_stream: Input stream')
# Print the streams so that you can visually check the results.
print_stream(i_stream)
print_stream(j_stream)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# q_stream, r_stream are the two output streams of the wrapped function.
q_stream, r_stream = stream_func(
inputs = [i_stream, j_stream], # list of input timed_streams
f_type = 'timed', # Identifies the 'timed' wrapper.
f = max_and_min_values_in_all_timed_lists,
num_outputs=2, # two output streams
window_size=3.0,
step_size=3.0)
q_stream.set_name('max of i_stream, j_stream timed windows')
r_stream.set_name('min of i_stream, j_stream timed windows')
print_stream(q_stream)
print_stream(r_stream)
# Drive the two input streams.
t_i=0.0
t_j=0.0
for _ in range(20):
t_i += random.random()
t_j += random.random()
v_i = random.randint(0,9)
v_j = random.randint(0,9)
i_stream.append(TimeAndValue(t_i, v_i))
j_stream.append(TimeAndValue(t_j, v_j))
############################################################
############################################################
# SECTION 8. MULTIPLE INPUTS, MULTIPLE OUTPUT, STATEFUL
############################################################
############################################################
print
print '**************************************************'
print 'SECTION 8'
print 'EXAMPLES OF MULTIPLE INPUTS, MULTIPLE OUTPUTS, STATEFUL'
print '**************************************************'
#_____________________________________________________________
# SECTION 8 EXAMPLE 1: MULTIPLE INPUTS, MULTIPLE OUTPUTS, STATEFUL
#_____________________________________________________________
# SPECIFICATION:
# Section 8, example 1 is to Section 7, example 1, what
# Section 6, example 1 is to Section 5, example 1. The
# outputs in this example are the AVERAGES of the max and min
# over timed windows of all input streams (whereas in the
# previous example, the outputs were the max and min values
# without averaging).
# FIRST STEP:
# Write a function that has two parameters: a list of timed lists and
# a state.
# The function returns a tuple consisting of
# (1) a 2-tuple: the max and the min of the values of the timed lists
# (2) the next state.
[docs]def avg_of_max_and_min_values_in_all_timed_lists(list_of_timed_lists, state):
num_windows, sum_of_max, sum_of_min = state
if all(list_of_timed_lists):
next_max = max(max(v.value for v in timed_list)
for timed_list in list_of_timed_lists)
next_min = min(min(v.value for v in timed_list)
for timed_list in list_of_timed_lists)
num_windows += 1
sum_of_max += next_max
sum_of_min += next_min
avg_of_max = sum_of_max/float(num_windows)
avg_of_min = sum_of_min/float(num_windows)
state = (num_windows, sum_of_max, sum_of_min)
return ([avg_of_max, avg_of_min], state)
else:
avg_of_max = sum_of_max/float(num_windows)
avg_of_min = sum_of_min/float(num_windows)
return ([avg_of_max, avg_of_min], state)
print
print '--------------------------------------------------'
print 'SECTION 8. EXAMPLE 1 '
print ' MULTIPLE INPUTS, MULTIPLE OUTPUTS, STATEFUL'
print '--------------------------------------------------'
# Create inputs stream, i_stream and j_stream, for this example.
k_stream = Stream('k_stream: Input stream')
l_stream = Stream('l_stream: Input stream')
# Print the streams so that you can visually check the results.
print_stream(k_stream)
print_stream(l_stream)
# SECOND STEP.
# Wrap the function with the 'timed' wrapper.
# o_stream, o_stream are the two output streams of the wrapped function.
o_stream, p_stream = stream_func(
inputs = [k_stream, l_stream], # list of input timed_streams
f_type = 'timed', # Identifies the 'timed' wrapper
f = avg_of_max_and_min_values_in_all_timed_lists,
num_outputs=2, # two output streams
state= (0, 0.0, 0.0), # Initial num windows, sum_max, sum_min
window_size=3.0,
step_size=3.0)
o_stream.set_name('avg of max of k_stream, l_stream timed windows')
p_stream.set_name('avg of min of k_stream, l_stream timed windows')
print_stream(o_stream)
print_stream(p_stream)
# Drive the two input streams.
t_k=0.0
t_l=0.0
for _ in range(30):
t_k += random.random()
t_l += random.random()
v_k = random.randint(0,9)
v_l = random.randint(0,9)
k_stream.append(TimeAndValue(t_k, v_k))
l_stream.append(TimeAndValue(t_l, v_l))