Source code for alts.core.subscriber

#Version 1.1.1 conform as of 16.12.2024
"""
| *alts.core.subscriber*
"""
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING
from alts.core.configuration import Required, post_init

from alts.core.configuration import Configurable

if TYPE_CHECKING:
    from typing import Tuple, Callable
    from typing_extensions import Self #type: ignore
    from nptyping import NDArray, Number, Shape
    from alts.core.data.data_pools import DataPools, StreamDataPools, ProcessDataPools, ResultDataPools
    from alts.core.data_process.time_source import TimeSource
    from alts.core.subscribable import Subscribable
    from alts.core.oracle.oracles import Oracles, POracles
    from alts.core.experiment_modules import ExperimentModules




[docs] class Subscriber(Configurable): """ Subscriber() | **Description** | A Subscriber is capable of subscribing to a Subscribable and trying to update a Subscribable. """
[docs] def post_init(self): """ post_init(self) -> None | **Description** | Initializes the Configurable and subscribes. """ super().post_init() self.subscribe()
[docs] @abstractmethod def update(self, subscription: Subscribable) -> None: """ update(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. | Abstract Method :param subscription: The subscription to be updated :type subscription: Subscribable """ pass
[docs] @abstractmethod def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to all necessary things... nothing right now. | Abstract Method """ print(f"{self.__class__} subscribed...") pass
[docs] class DataPoolsSubscriber(Subscriber): """ DataPoolsSubscriber() | **Description** | A Subscriber of DataPools. """ data_pools: DataPools
[docs] class StreamDataSubscriber(DataPoolsSubscriber): """ StreamDataSubscriber() | **Description** | A Subscriber of StreamDataPools. """ data_pools: StreamDataPools
[docs] def stream_update(self, subscription: Subscribable): """ stream_update(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. :param subscription: The subscription to be updated :type subscription: Subscribable """ self.update(subscription)
[docs] def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to its ``StreamDataPools``. """ super().subscribe() self.data_pools.stream.subscribe(self, self.stream_update) # type: ignore print(f"to {self.data_pools.stream.__class__}")
[docs] class ProcessDataSubscriber(DataPoolsSubscriber): """ ProcessDataSubscriber() | **Description** | A Subscriber of StreamDataPools. """ data_pools: ProcessDataPools
[docs] def process_update(self, subscription: Subscribable): """ process_update(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. :param subscription: The subscription to be updated :type subscription: Subscribable """ self.update(subscription)
[docs] def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to its ``ProcessDataPools``. """ super().subscribe() self.data_pools.process.subscribe(self, self.process_update) # type: ignore print(f"to {self.data_pools.process.__class__}")
[docs] class ResultDataSubscriber(DataPoolsSubscriber): """ ResultDataSubscriber() | **Description** | A Subscriber of ResultDataPools. """ data_pools: ResultDataPools
[docs] def result_update(self, subscription: Subscribable): """ result_update(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. :param subscription: The subscription to be updated :type subscription: Subscribable """ self.update(subscription)
[docs] def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to its ``ResultDataPools``. """ super().subscribe() self.data_pools.result.subscribe(self, self.result_update) # type: ignore print(f"to {self.data_pools.result.__class__}")
[docs] class ExpModSubscriber(Subscriber): """ ExpModSubscriber() | **Description** | A Subscriber of ExperimentModules. """ exp_modules: ExperimentModules = post_init()
[docs] def experiment_update(self, subscription: Subscribable): """ experiment_update(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. :param subscription: The subscription to be updated :type subscription: Subscribable """ self.update(subscription)
[docs] def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to its ``ExperimentDataPools``. """ super().subscribe() self.exp_modules.subscribe(self, self.experiment_update) # type: ignore print(f"to {self.exp_modules.__class__}")
[docs] class TimeSubscriber(Subscriber): """ TimeSubscriber() | **Description** | A Subscriber of TimeSource. """ time_source: TimeSource = post_init()
[docs] def time_update(self, subscription: Subscribable): """ time_update(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. :param subscription: The subscription to be updated :type subscription: Subscribable """ self.update(subscription)
[docs] def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to its ``TimeSource``. """ super().subscribe() self.time_source.subscribe(self, self.time_update) # type: ignore print(f"to {self.time_source.__class__}")
[docs] class OraclesSubscriber(Subscriber): """ OraclesSubscriber() | **Description** | A Subscriber of Oracles. """ oracles: Oracles
[docs] class ProcessOracleSubscriber(OraclesSubscriber): """ ProcessOracleSubscriber() | **Description** | A Subscriber of POracles. """ oracles: POracles
[docs] def process_query(self, subscription: Subscribable): """ process_query(self, subscription) -> None | **Description** | Tries to update the given ``subscription``. :param subscription: The subscription to be updated :type subscription: Subscribable """ self.update(subscription)
[docs] def subscribe(self) -> None: """ subscribe(self) -> None | **Description** | Subscribes to its ``Oracles``. """ super().subscribe() self.oracles.process.subscribe(self, self.process_query) # type: ignore print(f"to {self.oracles.process.__class__}")