""" This module contains the Agent class. The Agent
and Stream classes are the building blocks of
PythonStreams.
"""
from Stream import Stream, StreamArray, StreamSeries, StreamTimed
from collections import namedtuple
import numpy as np
#import math
# EPSILON is a small number used to prevent division by 0
# and other numerical problems
EPSILON = 1E-12
"""
Named_Tuple
----------------
InList : a named_tuple with arguments:
list, start, stop
An InList defines the list slice:
list[start:stop]
"""
InList = namedtuple('InList', ['list', 'start', 'stop'])
[docs]class Agent(object):
"""
An agent is an automaton: a state-transition machine.
An agent is initialized in __init__ and a state
transition is executed by next().
An agent has lists of:
(1) input streams,
(2) output streams and
(3) call streams.
Streams are described in Stream.py.
During a state transition an agent:
(1) May read values from its input streams. (Note that
reading values in a stream does not change the
stream.)
(2) Append values to the tails of its output streams.
(3) Change the agent's own state.
When a call stream is modified the agent's next() method
is called which causes the agent to execute a state transition.
The default is that every input stream is also a call stream,
i.e., the agent executes a state transition when any of its
input streams is modified. For performance reasons, we
may not want the agent to execute state transitions each time
any of its input streams is modified; we may want the agent to
execute state transitions periodically --- for example, every
second. In this case, the call streams will be different from
the input streams. A call stream that has a value appended to
it every second will cause the agent to execute a state
transition every second.
Parameters
----------
in_streams : list of streams
out_streams : list of streams
call_streams : list of streams
When a new value is added to a stream in this list
a state transition is invoked.
This the usual way (but not the only way) in which
state transitions occur. A state transiton for an
agent ag can also be executed by calling ag.next()
state: object
The state of the agent. The state is updated after
a transition.
transition: function
This function is called by next() which
is the state-transition operation for this agent.
An agent's state transition is specified by
its transition function.
stream_manager : function
Each stream has management variables, such as whether
the stream is open or closed. After a state-transition
the agent executes the stream_manager function
to modify the management variables of the agent's output
and call streams.
name : str, optional
name of this agent
Attributes
----------
_in_lists: list of InList
InList defines the slice of a list.
The j-th element of _in_lists is an InList
that defines the slice of the j-th input stream
that may be read by this agent in a state
transition. For example, if
listj = _in_lists[j].lists
startj = _in_lists[j].start
stopj = _in_lists[j].stop
Then this agent can read the slice:
listj[startj:stopj]
of the jth input stream. This slice is a slice
of the most recent values of the stream.
_out_lists: list
The j-th element of _out_lists is the list of
values to be appended to the j-th output
stream after the state transition.
Methods
-------
next(stream_name=None)
Execute a state transition. The method has 3 parts:
(i) set up the data structures to execute
a state transition,
(ii) call transition to:
(a) get the values to be appended to output streams,
(b) get the next state, and
(c) update 'start' indices for each input stream.
The 'start' pointers are indices where the agent
asserts that it will no longer access
elements of its input streams with indices earlier
(i.e. smaller) that 'start'.
(iii) update data structures after the transition.
"""
def __init__(self, in_streams, out_streams, transition,
state=None, call_streams=None,
stream_manager=None, name=None):
self.in_streams = in_streams
self.out_streams = out_streams
self.state = state
self.transition = transition
# The default (i.e. when call_streams is None) is that
# the agent executes a state transition when any
# of its input streams is modified. If call_streams
# is not None, then the agent executes a state
# transition only when one of the specified call_streams is
# modified.
self.call_streams = in_streams if call_streams is None \
else call_streams
self.stream_manager = stream_manager
self.name = name
# Register this agent as a reader of its input streams.
for s in self.in_streams:
s.reader(self)
# Register this agent to be called when any of its call
# streams is extended.
for s in self.call_streams:
s.call(self)
# Initially each element of in_lists is the
# empty InList: list = [], start=stop=0
self._in_lists = [InList([], 0, 0) for s in self.in_streams]
self._in_lists_start_values = [0 for s in self.in_streams]
# Initially each element of _out_lists is the empty list.
self._out_lists = [[] for s in self.out_streams]
# When the agent is created, it executes a state transition
# reading the current values of its input streams. Note that
# even though the agent's call_streams may be empty, the agent
# executes a state transition when the agent is created.
self.next()
[docs] def next(self, stream_name=None):
"""Execute the next state transition.
This function does the following:
Part 1: set up data structures for the state transition.
Part 2: execute the state transition by calling self.transition
Part 3: update data structures after the transition.
This method can be called by any agent and is
called whenever a value is appended to any
stream in call_streams
Parameters
----------
stream_name : str, optional
A new value was appended to the stream with name
stream_name as a result of which this agent
executes a state transition.
"""
# PART 1
# Set up data structures, _in_lists, _out_lists, for
# the state transition.
# For a stream s, s.recent is the list that includes the
# most recent values of stream s.
# The values of s.recent[s.stop:] are arbitrary padding
# values (0), and the the values of s.recent[:s.stop]
# contain the s.stop most recent values of stream s.
# s.start[self] is an index where agent self
# will no longer access elements with indices less than
# s.start[self]. Therefore agent self will only access the slice
# s.recent[s.start[self]:s.stop]
# of stream s.
self._in_lists = [InList(s.recent, s.start[self], s.stop)\
for s in self.in_streams]
# Initially, the output lists of the agent are empty.
# Values will be appended to these output lists during a
# state transition.
self._out_lists = [[] for s in self.out_streams]
# PART 2
# Execute the transition_function.
# The transition function has two input parameters:
# (1) The recent values of input streams of the agent,
# i.e., self._in_lists
# (2) The state of the agent.
# The transition function returns:
# (1) The lists of values to be appended
# to its output streams (self._out_lists)
# (2) The new state (self.state), and
# (3) For each input stream s, the new value of s.start[self].
# This agent will no longer read elements of the
# stream with indexes less than s.start[self].
self._out_lists, self.state, self._in_lists_start_values = \
self.transition(self._in_lists, self.state)
# PART 3
# Update data structures after the state transition.
# Extend output streams with new values generated in the
# state transition.
# Checks
# If a parameter is None, convert it into the empty list.
if not self._out_lists: self._out_lists = list()
if not self.out_streams: self.out_streams = list()
if len(self._out_lists) != len(self.out_streams):
raise ValueError(
'number of output lists, {0}, not equal to number of output streams, {1}'.\
format(len(self._out_lists),len(self.out_streams)))
# Finished checking self._out_lists and self.out_streams
# Put each of the output lists computed in the state
# transition into each of the output streams.
for j in range(len(self.out_streams)):
self.out_streams[j].extend(self._out_lists[j])
# For the j-th input stream in_streams[j], inform this stream
# that this agent will no longer read elements of the stream with
# indices smaller than _in_lists_start_values[j].
for j in range(len(self.in_streams)):
self.in_streams[j].set_start(
self, self._in_lists_start_values[j])
# Update stream management variables.
if self.stream_manager is not None:
self.stream_manager(
self.out_streams,
self._in_lists, self._out_lists, self.state)