Process (Implementation)#

alts.modules.data_process.process
class DataSourceProcess(data_source)[source]#

Bases: Process, ProcessOracleSubscriber

Description
DataSourceProcess is a process specifically for data sources.
Parameters:

data_source (DataSource) – A DataSource containing all data points

data_source: DataSource = NOTSET#
post_init(self) None[source]#
Description
Initializes its DataSource, POracles and ResultDataPools.
Raises:

TypeError – If the DataPools is not a ResultDataPools or the oracles is not POracles

process_query(self, subscription) None[source]#
Description
Pops all processed queries in the oracle and adds them to its own data pools.
Parameters:

subscription (Any) – Does nothing

query(self, queries) data_points[source]#
Description
Returns all given queries with their associated results from the data source.
Parameters:

queries (NDArray) – Requested queries

Returns:

Requested queries and their associated results.

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

query_constrain(self) QueryConstrain[source]#
Description
Returns its own query constraints.
Returns:

Data pool’s query constraints

Return type:

QueryConstrain

result_constrain(self) ResultConstrain[source]#
Description
Returns its own result constraints.
Returns:

Data pool’s result constraints

Return type:

ResultConstrain

class DelayedProcess(data_source)[source]#

Bases: Process, DelayedConstrained

Description
DelayedProcess is a process specifically for data where queries have intermediate and time-delayed results.
Parameters:

data_source (DataSource) – A DataSource containing all data points

add_intermediate_results(self) data_points[source]#
Description
Processes the next queries and returns the intermediate results.
Returns:

Processed intermediate queries and their associated results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

add_results(self) delayed_data_points[source]#
Description
If an intermediate query has been made, queries the delayed queries and results and adds them to its DataPools.
Returns:

Delayed data points

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

added_data(self. queries, results) None[source]#
Description
Informs this process that all new data has been added.
Parameters:
  • queries (Any) – Unused

  • results (Any) – Unused

added_intermediate_data(self, queries, results) None[source]#
Description
Updates its last processed queries and results, and sets its status not ready to process new queries.
Parameters:
  • queries – Last processed intermediate queries

  • results – Last processed queries’ delayed results

data_source: DataSource = NOTSET#
delayed_constrain(self) ResultConstrain[source]#
Description
Returns its own delayed result constraints.
Returns:

Data pool’s delayed result constraints

Return type:

ResultConstrain

delayed_query(self) data_points[source]#
Description
Returns the time-modified delayed queries with the delayed results.
Returns:

Delayed queries, delayed results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

has_new_data: bool = False#
post_init(self) None[source]#
Description
Initializes its DataSource, POracles and ResultDataPools.
Raises:

TypeError – If the DataPools is not a ResultDataPools or the oracles is not POracles

query(queries) data_points[source]#
Description
Queries the given queries and returns their data points.
Parameters:

queries – Requested queries

Returns:

Queried queries, Queried queries’ results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

query_constrain(self) QueryConstrain[source]#
Description
Returns its own query constraints.
Returns:

Data pool’s query constraints

Return type:

QueryConstrain

ready: bool = True#
result_constrain(self) ResultConstrain[source]#
Description
Returns its own result constraints.
Returns:

Data pool’s result constraints

Return type:

ResultConstrain

step(self, iteration) Tuple[data_points, delayed_data_points][source]#
Description
Processes the next queries in the queue. Returns their intermediate and delayed results.
Parameters:

iteration (Any) – Unused

Returns:

Intermediate queries, their results, Delayed queries, their results

Return type:

Tuple[Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_],Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]]

update(self) data_points[source]#
Description
Queries the last queried queries again. Sets the `has_new_data` flag to True and returns the data points.
Returns:

Intermediate queries, delayed results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

class DelayedStreamProcess(data_source, stop_time, time_behavior)[source]#

Bases: DelayedProcess, StreamProcess

Description
A DelayedProcess for StreamProcesses.
Parameters:
  • data_source (DataSource) – A DataSource containing all data points

  • stop_time (float) – The stopping time of the experiment (default= 1000)

  • time_behaviour (TimeDataSource) – A DataSource with time-dependent data

class IntegratingDSProcess(data_source, stop_time, time_behavior)[source]#

Bases: DelayedStreamProcess

Description
The Integrating Delayed Stream Process integrates its results over a given time period.
Parameters:
  • data_source (DataSource) – A DataSource containing all data points

  • stop_time (float) – The stopping time of the experiment (default= 1000)

  • time_behaviour (TimeDataSource) – A DataSource with time-dependent data

  • integration_time (float) – What time period to integrate the results over (default= 4)

added_data(self. queries, results) None[source]#
Description
Informs this process that all new data has been added and unassigns the integrated result.
Parameters:
  • queries (Any) – Unused

  • results (Any) – Unused

added_intermediate_data(self, queries, results) None[source]#
Description
Updates its last processed queries and results, sets its status not ready to process new queries, sets start time and sets the integrated result to a zero-matrix.
Parameters:
  • queries – Last processed intermediate queries

  • results – Last processed queries’ delayed results

delayed_query(self) data_points[source]#
Description
Returns the time-modified delayed queries and the unchanged integrated result.
Returns:

Delayed queries, integrated results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

Raises:

ValueError if integrated_result is None

end_time: float = 0.0#
integrated_result: NDArray[Shape['data_nr, ... output_shape'], Number] | None = None#
integration_time: float = 4#
sliding_window: NDArray[Shape['data_nr, ... output_shape'], Number] | None = None#
start_time: float = 0.0#
update(self) data_points[source]#
Description
Queries the last queried queries again. Sets the end time and `has_new_data` flag to True if the integration time has passed. Adds the result to the integrated result. Finally, returns the data points.
Returns:

Intermediate queries, delayed results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

class StreamProcess(stop_time, time_behaviour)[source]#

Bases: Process, TimeSubscriber

Description
StreamProcess is a process for data streams.
Parameters:
  • stop_time (float) – The stopping time of the experiment (default= 1000)

  • time_behaviour (TimeDataSource) – A DataSource with time-dependent data

post_init(self) None[source]#
Description
Initializes its TimeDataSource and StreamDataPools.
Raises:

TypeError – If the DataPools is not a StreamDataPools

stop_time: float = 1000#
time_behavior: TimeDataSource = NOTSET#
time_update(self, subscription) times, vars[source]#
Description
Returns the current time and its corresponding result.
Parameters:

subscription (Any) – Unused

Returns:

Current time, Current result

Return type:

Tuple[NDArray, NDArray]

class WindowDSProcess(data_source, stop_time, time_behavior, window_size)[source]#

Bases: DelayedStreamProcess

Description
The Window Delayed Stream Process integrates the results over a given window size.
Similar to IntegratingDSProcess, where a new window is opened and filled, except here the window slides over with each new query, thus always being filled.
Parameters:
  • data_source (DataSource) – A DataSource containing all data points

  • stop_time (float) – The stopping time of the experiment (default= 1000)

  • time_behaviour (TimeDataSource) – A DataSource with time-dependent data

  • window_size (float) – The size of the integrating window (default= 4)

added_intermediate_data(self, queries, results) None[source]#
Description
Updates its last processed queries and results, and sets its status not ready to process new queries.
Also removes the first query of the sliding query window.
Parameters:
  • queries – Last processed intermediate queries

  • results – Last processed queries’ delayed results

delayed_query(self) data_points[source]#
Description
Returns the time-modified delayed queries with the sum of the sliding result window.
Returns:

Delayed queries, Sliding result window sum

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

post_init(self) None[source]#
Description
Initializes its DataSource, POracles, ResultDataPools and sliding windows.
Raises:

TypeError – If the DataPools is not a ResultDataPools or the oracles is not POracles

update(self) data_points[source]#
Description
Queries the last queried queries again. Sets the `has_new_data` flag to True and returns the data points.
Also updates the sliding query and result windows.
Returns:

Intermediate queries, delayed results

Return type:

Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]

window_size: float = 4#