Operators module

This module has functions that convert operations on standard Python data structures to operations on streams.

The module has three collections of functions: (1) functions that convert operations on standard Python data structures to operations on streams. These functions operate on a list of input streams to generate a list of output streams. The functions deal with the following data structures:

  1. lists,
  2. individual elements of lists,
  3. sliding windows, and
  4. timed windows.

(2) functions that map the general case of multiple input streams and multiple output streams described above to the following special cases:

  1. merge: an arbitrary number of input streams and a single output stream.
  2. split: a single input stream and an arbitrary number of output streams.
  3. op: a single input stream and a single output stream.
  4. source: no input and an arbitrary number of output streams.

(e) sink: no ouput and an arbitrary number of input streams. These special cases simplify functions that need to be written for standard Python data structures. You can always use the multiple inputs and outputs case even if there is only one or no input or output; however, the functions for merge, split, op, source, and sink are simpler than the multiple input and output case.

(3) a function that provides a single common signature for converting operations on Python structures to operations on streams regardless of whether the function has no inputs, a single input stream, a list of input streams, or no outputs, a single output stream or a list of output streams. (12 October 2015. Mani. Changed initialization of output_lists.)

Operators.adjustable_window_agent(f, inputs, outputs, state, call_streams=None, window_size=None, step_size=None)[source]
Operators.adjustable_window_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.assert_is_list(x)[source]
Operators.assert_is_list_of_lists(x, list_size=None)[source]
Operators.assert_is_list_of_streams(x)[source]
Operators.assert_is_list_of_streams_or_None(x)[source]
Operators.assert_is_list_or_None(x)[source]
Operators.asynch_element_agent(f, inputs, outputs, state, call_streams, window_size, step_size)[source]
Operators.asynch_element_func(f, inputs, num_outputs, state, call_streams=None, window_size=None, step_size=None)[source]
Operators.awf(inputs, outputs, func, window_size, step_size, state=None, call_streams=None, **kwargs)[source]
Operators.dynamic_window_agent(f, input_stream, output_stream, state, min_window_size, max_window_size, step_size)[source]
Operators.dynamic_window_func(f, inputs, state, min_window_size, max_window_size, step_size)[source]
Operators.ef(inputs, outputs, func, state=None, call_streams=None, **kwargs)[source]
Operators.element_agent(f, inputs, outputs, state, call_streams, window_size, step_size)[source]
Operators.element_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.h(f_type, *args)[source]

Calls the appropriate wrapper function given the name of the wrapper. The wrapper functions are list_func, element_func, window_func, ... for wrapper names ‘list’, ‘element’, ‘window’,..

Operators.h_agent(f_type, *args)[source]

Calls the appropriate wrapper function given the name of the wrapper. The wrapper functions are list_agent, element_agent, window_agent, ... for wrapper names ‘list’, ‘element’, ‘window’,..

Operators.lf(inputs, outputs, func, state=None, call_streams=None, **kwargs)[source]
Operators.list_agent(f, inputs, outputs, state, call_streams, window_size, step_size)[source]
Operators.list_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.list_index_for_timestamp(in_list, start_index, timestamp)[source]

A helper function for timed operators. The basic idea is to return the earliest index in in_list.list[start_index:in_list.stop] with a time field that is greater than or equal to timestamp. If no such index exists then return a negative number.

Parameters:

in_list: InList

InList = namedtuple(‘InList’, [‘list’, ‘start’, ‘stop’]) A slice into a stream.

start_index: nonnegative integer

A pointer into in_list.list

timestamp: number

Returns:

Returns positive integer i where:

either: ‘FOUND TIME WINDOW IN IN_LIST’

i >= start_index and i <= in_list.stop and (in_list[start_index] >= timestamp or in_list.list[i-2][0] < timestamp <= in_list.list[i-1][0] )

)

or: ‘NO TIME WINDOW IN IN_LIST’

i < 0 (negative i indicates no time window) and
(in_list.list[in_list.stop-1] <= timestamp

or

the list is empty, i.e. (in_list.start = in_list.stop)

Operators.main()[source]
Operators.many_outputs_source(f_type, f, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.many_to_many(f_type, f, in_streams, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.many_to_many_agent(f_type, f, in_streams, out_streams, state, call_streams, window_size, step_size)[source]
Operators.merge(f_type, f, in_streams, state, call_streams, window_size, step_size)[source]
Operators.merge_agent(f_type, f, in_streams, out_stream, state, call_streams, window_size, step_size)[source]
Operators.op(f_type, f, in_stream, state, call_streams, window_size, step_size)[source]
Operators.op_agent(f_type, f, in_stream, out_stream, state, call_streams, window_size, step_size)[source]
Operators.remove_novalue_and_open_multivalue(l)[source]

This function returns a list which is the same as the input parameter l except that (1) _no_value elements in l are deleted and (2) each _multivalue element in l is opened

i.e., for an object _multivalue(list_x) each element of list_x appears in the returned list.
Operators.single_output_source(f_type, f, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.single_output_source_agent(f_type, f, out_stream, state, call_streams, window_size, step_size)[source]
Operators.sink(f_type, f, in_stream, state, call_streams, window_size, step_size)[source]
Operators.sink_merge(f_type, f, in_streams, state, call_streams, window_size, step_size)[source]
Operators.split(f_type, f, in_stream, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.split_agent(f_type, f, in_stream, out_streams, state, call_streams, window_size, step_size)[source]
Operators.stream_agent(inputs, outputs, f_type, f, state=None, call_streams=None, window_size=None, step_size=None)[source]

Provides a common signature for converting functions f on standard Python data structures to streams.

Parameters:

f_type : {‘element’, ‘list’, ‘window’, ‘timed’, ‘asynch_element’}

f_type identifies the type of function f where f is the next parameter.

f : function

inputs : {Stream, list of Streams}

When stream_func has:

no input streams, inputs is None a single input Stream, inputs is a single Stream multiple input Streams, inputs is a list of Streams.

outputs : list of Streams

state : object

state is None or is an arbitrary object. The state captures all the information necessary to continue processing the input streams.

call_streams : None or list of Stream

If call_streams is None then the program sets it to inputs (converting inputs to a list of Stream if necessary). This function is called when, and only when any stream in call_streams is modified.

window_size : None or int

window_size must be a positive integer if f_type is ‘window’ or ‘timed’. window_size is the size of the moving window on which the function operates.

step_size : None or int

step_size must be a positive integer if f_type is ‘window’ or ‘timed’. step_size is the number of steps by which the moving window moves on each execution of the function.

Returns:

None

Operators.stream_func(inputs, f_type, f, num_outputs, state=None, call_streams=None, window_size=None, step_size=None)[source]

Provides a common signature for converting functions f on standard Python data structures to streams.

Parameters:

f_type : {‘element’, ‘list’, ‘window’, ‘timed’, ‘asynch_element’}

f_type identifies the type of function f where f is the next parameter.

f : function

inputs : {Stream, list of Streams}

When stream_func has:

no input streams, inputs is None a single input Stream, inputs is a single Stream multiple input Streams, inputs is a list of Streams.

num_outputs : int

A nonnegative integer which is the number of output streams of this function.

state : object

state is None or is an arbitrary object. The state captures all the information necessary to continue processing the input streams.

call_streams : None or list of Stream

If call_streams is None then the program sets it to inputs (converting inputs to a list of Stream if necessary). This function is called when, and only when any stream in call_streams is modified.

window_size : None or int

window_size must be a positive integer if f_type is ‘window’ or ‘timed’. window_size is the size of the moving window on which the function operates.

step_size : None or int

step_size must be a positive integer if f_type is ‘window’ or ‘timed’. step_size is the number of steps by which the moving window moves on each execution of the function.

Returns:

list of Streams

Function f is applied to the appropriate data structure in the input streams to put values in the output streams. stream_func returns the output streams.

Operators.tf(inputs, outputs, func, window_size, step_size, state=None, call_streams=None, **kwargs)[source]
Operators.timed_agent(f, inputs, outputs, state, call_streams, window_size, step_size)[source]
Operators.timed_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)[source]
Operators.wf(inputs, outputs, func, window_size, step_size, state=None, call_streams=None, **kwargs)[source]
Operators.window_agent(f, inputs, outputs, state, call_streams, window_size, step_size)[source]
Operators.window_func(f, inputs, num_outputs, state, call_streams, window_size, step_size)[source]