Process (Implementation) [5%]#

class DataSourceProcess(*args: 'Any', **kwargs: 'Any')[source]#

Bases: Process, ProcessOracleSubscriber

data_source: DataSource = NOTSET#
post_init(self) None[source]#
Description
Runs after __post_init__, does nothing here.
process_query(self, subscription) None[source]#
Description
Tries to update the given subscription.
Parameters:

subscription (Subscribable) – The subscription to be updated

query(self, queries) data_points[source]#
Description
For a list of queries it returns a list of queries with associated results.
Parameters:

queries (NDArray) – A list of queries

Returns:

A tuple of queries and their results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

query_constrain(self) QueryConstrain[source]#
Description
Returns the QueryConstrain of the object.
Not implemented here.
result_constrain(self) ResultConstrain[source]#
Description
Returns the ResultConstrain of the object.
Not implemented here.
class DelayedProcess(*args: 'Any', **kwargs: 'Any')[source]#

Bases: Process, DelayedConstrained

add_intermediate_results()[source]#
add_results()[source]#
added_data(queries, results)[source]#
added_intermediate_data(queries, results)[source]#
data_source: DataSource = NOTSET#
delayed_constrain(self) ResultConstrain[source]#
Description
Returns the ResultConstrain of the object.
Not implemented here.
delayed_query() Tuple[NDArray[Shape['data_nr, ... query_shape'], Number], NDArray[Shape['data_nr, ... result_shape'], Number]][source]#
has_new_data: bool = False#
post_init(self) None[source]#
Description
Runs after __post_init__, does nothing here.
query(self, queries) data_points[source]#
Description
For a list of queries it returns a list of queries with associated results.
Parameters:

queries – A list of queries

Returns:

A tuple of queries and their results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

query_constrain(self) QueryConstrain[source]#
Description
Returns the QueryConstrain of the object.
Not implemented here.
ready: bool = True#
result_constrain(self) ResultConstrain[source]#
Description
Returns the ResultConstrain of the object.
Not implemented here.
step(self, iteration)[source]#
Description
Advances the time (of its time source) by iteration step and returns the new data.
Parameters:

iteration (int) – Amount of steps to do

Returns:

Processed Queries before step, Results before step, Processed Queries after step, Results after step

Return type:

NDArray, NDArray, NDArray, NDArray

update()[source]#
class DelayedStreamProcess(*args: 'Any', **kwargs: 'Any')[source]#

Bases: DelayedProcess, StreamProcess

class IntegratingDSProcess(*args: 'Any', **kwargs: 'Any')[source]#

Bases: DelayedStreamProcess

added_data(queries, results)[source]#
added_intermediate_data(queries, results)[source]#
delayed_query() Tuple[NDArray[Shape['data_nr, ... query_shape'], Number], NDArray[Shape['data_nr, ... result_shape'], Number]][source]#
end_time: float = 0.0#
integrated_result: NDArray[Shape['data_nr, ... output_shape'], Number] | None = None#
integration_time: float = 4#
sliding_window: NDArray[Shape['data_nr, ... output_shape'], Number] | None = None#
start_time: float = 0.0#
update(self, subscription) None[source]#
Description
Tries to update the given subscription.
Abstract Method
Parameters:

subscription (Subscribable) – The subscription to be updated

class StreamProcess(stop_time, time_behaviour, data_pools)[source]#

Bases: Process, TimeSubscriber

Description
StreamProcess is a simple stream based Process.
post_init(self) None[source]#
Description
Initializes its TimeDataSource and StreamDataPools.
stop_time: float = 1000#
time_behavior: TimeDataSource = NOTSET#
time_update(self, subscription) None[source]#
Description
Tries to update the given subscription.
Parameters:

subscription (Subscribable) – The subscription to be updated

class WindowDSProcess(*args: 'Any', **kwargs: 'Any')[source]#

Bases: DelayedStreamProcess

added_intermediate_data(queries, results)[source]#
delayed_query() Tuple[NDArray[Shape['data_nr, ... query_shape'], Number], NDArray[Shape['data_nr, ... result_shape'], Number]][source]#
post_init(self) None[source]#
Description
Initializes its TimeDataSource and StreamDataPools.
update(self, subscription) None[source]#
Description
Tries to update the given subscription.
Abstract Method
Parameters:

subscription (Subscribable) – The subscription to be updated

window_size: float = 4#