Process (Implementation)#
- class DataSourceProcess(data_source)[source]#
Bases:
Process
,ProcessOracleSubscriber
DescriptionDataSourceProcess is a process specifically for data sources.- Parameters:
data_source (DataSource) – A DataSource containing all data points
- data_source: DataSource = NOTSET#
- post_init(self) None [source]#
- DescriptionInitializes its DataSource, POracles and ResultDataPools.
- Raises:
TypeError – If the DataPools is not a ResultDataPools or the oracles is not POracles
- process_query(self, subscription) None [source]#
- DescriptionPops all processed queries in the oracle and adds them to its own data pools.
- Parameters:
subscription (Any) – Does nothing
- query(self, queries) data_points [source]#
- DescriptionReturns all given queries with their associated results from the data source.
- Parameters:
queries (NDArray) – Requested queries
- Returns:
Requested queries and their associated results.
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- query_constrain(self) QueryConstrain [source]#
- DescriptionReturns its own query constraints.
- Returns:
Data pool’s query constraints
- Return type:
- class DelayedProcess(data_source)[source]#
Bases:
Process
,DelayedConstrained
DescriptionDelayedProcess is a process specifically for data where queries have intermediate and time-delayed results.- Parameters:
data_source (DataSource) – A DataSource containing all data points
- add_intermediate_results(self) data_points [source]#
- DescriptionProcesses the next queries and returns the intermediate results.
- Returns:
Processed intermediate queries and their associated results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- add_results(self) delayed_data_points [source]#
- DescriptionIf an intermediate query has been made, queries the delayed queries and results and adds them to its DataPools.
- Returns:
Delayed data points
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- added_data(self. queries, results) None [source]#
- DescriptionInforms this process that all new data has been added.
- Parameters:
queries (Any) – Unused
results (Any) – Unused
- added_intermediate_data(self, queries, results) None [source]#
- DescriptionUpdates its last processed queries and results, and sets its status not ready to process new queries.
- Parameters:
queries – Last processed intermediate queries
results – Last processed queries’ delayed results
- data_source: DataSource = NOTSET#
- delayed_constrain(self) ResultConstrain [source]#
- DescriptionReturns its own delayed result constraints.
- Returns:
Data pool’s delayed result constraints
- Return type:
ResultConstrain
- delayed_query(self) data_points [source]#
- DescriptionReturns the time-modified delayed queries with the delayed results.
- Returns:
Delayed queries, delayed results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- has_new_data: bool = False#
- post_init(self) None [source]#
- DescriptionInitializes its DataSource, POracles and ResultDataPools.
- Raises:
TypeError – If the DataPools is not a ResultDataPools or the oracles is not POracles
- query(queries) data_points [source]#
- DescriptionQueries the given queries and returns their data points.
- Parameters:
queries – Requested queries
- Returns:
Queried queries, Queried queries’ results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- query_constrain(self) QueryConstrain [source]#
- DescriptionReturns its own query constraints.
- Returns:
Data pool’s query constraints
- Return type:
- ready: bool = True#
- result_constrain(self) ResultConstrain [source]#
- DescriptionReturns its own result constraints.
- Returns:
Data pool’s result constraints
- Return type:
ResultConstrain
- step(self, iteration) Tuple[data_points, delayed_data_points] [source]#
- DescriptionProcesses the next queries in the queue. Returns their intermediate and delayed results.
- Parameters:
iteration (Any) – Unused
- Returns:
Intermediate queries, their results, Delayed queries, their results
- Return type:
Tuple[Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_],Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]]
- update(self) data_points [source]#
- DescriptionQueries the last queried queries again. Sets the
`has_new_data`
flag to True and returns the data points.- Returns:
Intermediate queries, delayed results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- class DelayedStreamProcess(data_source, stop_time, time_behavior)[source]#
Bases:
DelayedProcess
,StreamProcess
DescriptionA DelayedProcess for StreamProcesses.- Parameters:
data_source (DataSource) – A DataSource containing all data points
stop_time (float) – The stopping time of the experiment (default= 1000)
time_behaviour (TimeDataSource) – A DataSource with time-dependent data
- class IntegratingDSProcess(data_source, stop_time, time_behavior)[source]#
Bases:
DelayedStreamProcess
DescriptionThe Integrating Delayed Stream Process integrates its results over a given time period.- Parameters:
data_source (DataSource) – A DataSource containing all data points
stop_time (float) – The stopping time of the experiment (default= 1000)
time_behaviour (TimeDataSource) – A DataSource with time-dependent data
integration_time (float) – What time period to integrate the results over (default= 4)
- added_data(self. queries, results) None [source]#
- DescriptionInforms this process that all new data has been added and unassigns the integrated result.
- Parameters:
queries (Any) – Unused
results (Any) – Unused
- added_intermediate_data(self, queries, results) None [source]#
- DescriptionUpdates its last processed queries and results, sets its status not ready to process new queries, sets start time and sets the integrated result to a zero-matrix.
- Parameters:
queries – Last processed intermediate queries
results – Last processed queries’ delayed results
- delayed_query(self) data_points [source]#
- DescriptionReturns the time-modified delayed queries and the unchanged integrated result.
- Returns:
Delayed queries, integrated results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- Raises:
ValueError if integrated_result is None
- end_time: float = 0.0#
- integrated_result: NDArray[Shape['data_nr, ... output_shape'], Number] | None = None#
- integration_time: float = 4#
- sliding_window: NDArray[Shape['data_nr, ... output_shape'], Number] | None = None#
- start_time: float = 0.0#
- update(self) data_points [source]#
- DescriptionQueries the last queried queries again. Sets the end time and
`has_new_data`
flag to True if the integration time has passed. Adds the result to the integrated result. Finally, returns the data points.- Returns:
Intermediate queries, delayed results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- class StreamProcess(stop_time, time_behaviour)[source]#
Bases:
Process
,TimeSubscriber
DescriptionStreamProcess is a process for data streams.- Parameters:
stop_time (float) – The stopping time of the experiment (default= 1000)
time_behaviour (TimeDataSource) – A DataSource with time-dependent data
- post_init(self) None [source]#
- DescriptionInitializes its TimeDataSource and StreamDataPools.
- Raises:
TypeError – If the DataPools is not a StreamDataPools
- stop_time: float = 1000#
- time_behavior: TimeDataSource = NOTSET#
- class WindowDSProcess(data_source, stop_time, time_behavior, window_size)[source]#
Bases:
DelayedStreamProcess
DescriptionThe Window Delayed Stream Process integrates the results over a given window size.Similar to IntegratingDSProcess, where a new window is opened and filled, except here the window slides over with each new query, thus always being filled.- Parameters:
data_source (DataSource) – A DataSource containing all data points
stop_time (float) – The stopping time of the experiment (default= 1000)
time_behaviour (TimeDataSource) – A DataSource with time-dependent data
window_size (float) – The size of the integrating window (default= 4)
- added_intermediate_data(self, queries, results) None [source]#
- DescriptionUpdates its last processed queries and results, and sets its status not ready to process new queries.Also removes the first query of the sliding query window.
- Parameters:
queries – Last processed intermediate queries
results – Last processed queries’ delayed results
- delayed_query(self) data_points [source]#
- DescriptionReturns the time-modified delayed queries with the sum of the sliding result window.
- Returns:
Delayed queries, Sliding result window sum
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- post_init(self) None [source]#
- DescriptionInitializes its DataSource, POracles, ResultDataPools and sliding windows.
- Raises:
TypeError – If the DataPools is not a ResultDataPools or the oracles is not POracles
- update(self) data_points [source]#
- DescriptionQueries the last queried queries again. Sets the
`has_new_data`
flag to True and returns the data points.Also updates the sliding query and result windows.- Returns:
Intermediate queries, delayed results
- Return type:
Tuple[NDArray,`NDArray <https://numpy.org/doc/stable/reference/arrays.ndarray.html>`_]
- window_size: float = 4#