Source code for alts.core.data.data_pools

#Version 1.1.1 conform as of 29.11.2024
"""
*alts.core.data.data_pools*
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any

from dataclasses import dataclass
from typing_extensions import Self
from alts.core.data.queried_data_pool import QueriedDataPool
from alts.core.configuration import Configurable, is_set, init, post_init, pre_init

[docs] class DataPools(Configurable): """ DataPools() | **Description** | A ``DataPools`` stores all queries and results that have been supplied to it. """
[docs] def trigger_subscriber(self): """ trigger_subscriber() -> None | **Description** | Updates its own available data. """ pass
def __call__(self, *args: Any, **kwds: Any) -> Self: """ __call__(self, any) -> Self | **Description** | Returns a new instance of itself. :param argsKeywords: Any parameters are being accepted but ignored :type argsKeywords: Any :return: An instance of this object :rtype: DataPools """ return self
[docs] @dataclass class StreamDataPools(DataPools): """ StreamDataPools(stream) | **Description** | A ``DataPools`` specifically for streams. :param stream: The stream to be pooled :type stream: :doc:`QueriedDataPool </core/data/queried_data_pool>` """ stream: QueriedDataPool = init()
[docs] def trigger_subscriber(self): """ trigger_subscriber(self) -> None | **Description** | Updates its own available data stream. """ super().trigger_subscriber() self.stream.update()
[docs] @dataclass class ResultDataPools(DataPools): """ ResultDataPools(result) | **Description** | A ``DataPools`` specifically for results. :param result: The results to be pooled :type result: :doc:`QueriedDataPool </core/data/queried_data_pool>` """ result: QueriedDataPool = init()
[docs] def trigger_subscriber(self): """ trigger_subscriber(self) -> None | **Description** | Updates its own available data. """ super().trigger_subscriber() self.result.update()
[docs] @dataclass class ProcessDataPools(DataPools): """ ProcessDataPools(process) | **Description** | A ``DataPools`` specifically for results. :param process: The process to be pooled :type process: :doc:`QueriedDataPool </core/data/queried_data_pool>` """ process: QueriedDataPool = init()
[docs] def trigger_subscriber(self): """ trigger_subscriber(self) -> None | **Description** | Updates its own available data. """ super().trigger_subscriber() self.process.update()
[docs] @dataclass class PRDataPools(ResultDataPools, ProcessDataPools): """ PRDataPools() | **Description** | A ``DataPools`` specifically for process results. """ pass
[docs] @dataclass class SPRDataPools(StreamDataPools, PRDataPools): """ SPRDataPools() | **Description** | A ``DataPools`` specifically for stream process results. """ pass