Source code for alts.modules.evaluator

#Version 1.1.1 conform as of 23.04.2025
"""
| *alts.modules.evaluator*
| :doc:`Core Module </core/evaluator>`
"""
from __future__ import annotations
from typing import TYPE_CHECKING

from dataclasses import dataclass, field
import os
import time


from alts.core.evaluator import Evaluator, Evaluate, LogingEvaluator
from alts.core.data.data_pools import StreamDataPools, ProcessDataPools, ResultDataPools
from alts.core.oracle.oracles import POracles
from alts.core.configuration import pre_init, post_init
from alts.modules.data_process.process import DelayedProcess

import numpy as np
from matplotlib import pyplot as plot # type: ignore


if TYPE_CHECKING:
    from alts.core.experiment import Experiment
    from nptyping import  NDArray, Number, Shape
    from alts.core.oracle.data_source import DataSource


[docs] class PrintNewDataPointsEvaluator(Evaluator): """ PrintNewDataPointsEvaluator() | **Description** | This evaluator keeps track of the experiment's results. """
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to print new data points before adding them to the experiment's result data pool. | Requires the experiment's data pool to be a ResultDataPool. :param experiment: The experiment to be evaluated :type experiment: Experiment :raises: TypeError if self.experiment.data_pools is not a ResultDataPools """ super().register(experiment) if isinstance(self.experiment.data_pools, ResultDataPools): self.experiment.data_pools.result.add = Evaluate(self.experiment.data_pools.result.add) self.experiment.data_pools.result.add.pre(self._print_new_data_points) else: raise TypeError(f"PrintNewDataPointsEvaluator requires ResultDataPools")
def _print_new_data_points(self, data_points): """ _print_new_data_points(self, data_points) -> None | **Description** | Prints the given data points. :param data_points: The data points to be printed :type data_points: Tuple[NDArray[Shape["query_nr, ... query_dim"], Number], NDArray[Shape["query_nr, ... result_dim"], Number]] """ print(data_points)
[docs] class PrintQueryEvaluator(Evaluator): """ PrintQueryEvaluator() | **Description** | This evaluator keeps track of the experiment's queries. """
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to print new queries before adding them to the experiment's query queue. | Requires the experiment's oracle to be a POracles. :param experiment: The experiment to be evaluated :type experiment: Experiment :raises: TypeError if self.experiment.oracles is not a POracles """ super().register(experiment) if isinstance(self.experiment.oracles, POracles): self.experiment.oracles.process.add = Evaluate(self.experiment.oracles.process.add) self.experiment.oracles.process.add.pre(self.print_query) else: raise TypeError(f"PrintQueryEvaluator requires POracles")
[docs] def print_query(self, queries): """ print_query(self, queries) -> None | **Description** | Prints the given queries. :param queries: New queries going to the query queue :type queries: Tuple[NDArray[Shape["query_nr, ... query_dim"], Number] """ print("Queried: \n",queries)
[docs] class PrintExpTimeEvaluator(Evaluator): """ PrintExpTimeEvaluator() | **Description** | This evaluator measures how long the experiment takes to run. """
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to print how long it took to run. :param experiment: The experiment to be evaluated :type experiment: Experiment """ super().register(experiment) self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.pre(self.start_time) self.experiment.run.post(self.end_time)
[docs] def start_time(self): """ start_time(self) -> None | **Description** | Prints an anouncement of time measurement. Saves the current time as the start time. """ print(f"Start timing for {self.experiment.exp_name} {self.experiment.exp_nr}") self.start = time.time()
[docs] def end_time(self): """ end_time(self) -> None | **Description** | Substracts the start time from the current time and prints the resulting time. """ end = time.time() print(f"Time for {self.experiment.exp_name} {self.experiment.exp_nr}: ",end - self.start)
[docs] class PrintTimeSourceEvaluator(Evaluator): """ PrintTimeSourceEvaluator() | **Description** | This Evaluator keeps track of the experiment's internal time. """
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment's time source to print the current internal time at each time step. :param experiment: The experiment to be evaluated :type experiment: Experiment """ super().register(experiment) self.experiment.time_source.step = Evaluate(self.experiment.time_source.step) self.experiment.time_source.step.post(self.end_time)
[docs] def end_time(self, time): """ end_time(self, time) -> None | **Description** | Prints the given time. :param time: The experiment's current internal time :type time: int """ print("Sim Unit Time: ", time)
[docs] @dataclass class PlotNewDataPointsEvaluator(LogingEvaluator): """ PlotNewDataPointsEvaluator() | **Description** | This evaluator plots all of the experiment's new data points continuously as they arrive. """ interactive: bool = False folder: str = "fig" fig_name:str = "Data" queries: NDArray[Shape["query_nr, ... query_dim"], Number] = pre_init(None) # type: ignore results: NDArray[Shape["query_nr, ... result_dim"], Number] = pre_init(None) # type: ignore
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to plot new data points before adding them to the experiment's result data pool. | Requires the experiment's data pool to be a ResultDataPool. :param experiment: The experiment to be evaluated :type experiment: Experiment :raises: TypeError if self.experiment.data_pools is not a ResultDataPools """ super().register(experiment) os.makedirs(self.path, exist_ok=True) if isinstance(self.experiment.data_pools, ResultDataPools): self.experiment.data_pools.result.add = Evaluate(self.experiment.data_pools.result.add) self.experiment.data_pools.result.add.pre(self.plot_new_data_points) else: raise TypeError(f"PlotNewDataPointsEvaluator requires ResultDataPools") self.queries: NDArray[Shape["query_nr, ... query_dim"], Number] = None # type: ignore self.results: NDArray[Shape["query_nr, ... result_dim"], Number] = None # type: ignore
[docs] def plot_new_data_points(self, data_points): """ plot_new_data_points(self, data_points) -> None | **Description** | Adds the given data points to the saved ones and updates the plot. :param data_points: New data points to be plotted :type data_points: Tuple[NDArray[Shape["query_nr, ... query_dim"], Number], NDArray[Shape["query_nr, ... result_dim"], Number]] """ self.experiment.iteration queries, results = data_points if self.queries is None: self.queries = queries self.results = results else: self.queries = np.concatenate((self.queries, queries)) self.results = np.concatenate((self.results, results)) fig = plot.figure(self.fig_name) plot.scatter(self.queries,self.results) plot.title(self.fig_name) if self.interactive: plot.show() else: plot.savefig(f'{self.path}/{self.fig_name}_{self.iteration:05d}.png') plot.clf()
[docs] @dataclass class PlotAllDataPointsEvaluator(LogingEvaluator): """ PlotALlDataPointsEvaluator() | **Description** | This evaluator plots all of the experiment's data points after the experiment has concluded. """ interactive: bool = False folder: str = "fig" fig_name:str = "AllData" data_pools: ResultDataPools = post_init()
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to plot all data points after running the experiment. | Requires the experiment's data pool to be a ResultDataPool. :param experiment: The experiment to be evaluated :type experiment: Experiment :raises: TypeError if self.experiment.data_pools is not a ResultDataPools """ super().register(experiment) os.makedirs(self.path, exist_ok=True) if isinstance(self.experiment.data_pools, ResultDataPools): self.data_pools = self.experiment.data_pools else: raise TypeError(f"PlotNewDataPointsEvaluator requires ResultDataPools") self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.post(self.log_data)
[docs] def log_data(self): """ log_data(self) -> None | **Description** | Plots all of the experiment's data points. """ queries = self.data_pools.result.queries results = self.data_pools.result.results fig = plot.figure(self.fig_name) plot.scatter(queries,results) plot.title(self.fig_name) if self.interactive: plot.show() else: plot.savefig(f'{self.path}/{self.fig_name}.png') plot.clf()
[docs] @dataclass class PlotQueryDistEvaluator(LogingEvaluator): """ PlotQueryDistEvaluator() | **Description** | This evaluator plots all of the experiment's new queries continuously as they are made as a histogram. """ interactive: bool = False folder: str = "fig" fig_name:str = "Query distribution" queries: NDArray[Shape["query_nr, ... query_dim"], Number] = field(init = False, default = None) # type: ignore
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to plot the new queries as new frames of the histogram. | Requires the experiment's oracles to be a POracles. :param experiment: The experiment to be evaluated :type experiment: Experiment :raises: TypeError if self.experiment.oracles is not a POracles """ super().register(experiment) if isinstance(self.experiment.oracles, POracles): self.experiment.oracles.process.add = Evaluate(self.experiment.oracles.process.add) self.experiment.oracles.process.add.pre(self.plot_query_dist) else: raise TypeError(f"PlotQueryDistEvaluator requires POracles") self.queries: NDArray[Shape["query_nr, ... query_dim"], Number] = None # type: ignore
[docs] def plot_query_dist(self, queries): """ plot_query_dist(self, queries) -> None | **Description** | Plots the given queries on a new frame of the histogram. :param queries: New queries going to the query queue :type queries: Tuple[NDArray[Shape["query_nr, ... query_dim"], Number] """ if self.queries is None: self.queries = queries else: self.queries = np.concatenate((self.queries, queries)) fig = plot.figure(self.fig_name) plot.hist(self.queries) plot.title(self.fig_name) if self.interactive: plot.show() else: plot.savefig(f'{self.path}/{self.fig_name}_{self.iteration:05d}.png') plot.clf()
[docs] class PlotSampledQueriesEvaluator(LogingEvaluator): """ PlotSampledQueriesEvaluator() | **Description** | This evaluator plots the experiment's selected queries. """ interactive: bool = True folder: str = "fig" fig_name:str = "Sampled queries"
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to plot the queries passing its selection criteria. :param experiment: The experiment to be evaluated :type experiment: Experiments """ super().register(experiment) self.experiment.experiment_modules.query_selector.query_optimizer.selection_criteria.query = Evaluate(self.experiment.experiment_modules.query_selector.query_optimizer.selection_criteria.query) self.experiment.experiment_modules.query_selector.query_optimizer.selection_criteria.query.pre(self.plot_queries)
[docs] def plot_queries(self, queries): """ plot_queries(self, queries) -> None | **Description** | Plots the given queries. :param queries: New queries going to the query queue :type queries: Tuple[NDArray[Shape["query_nr, ... query_dim"], Number] """ fig = plot.figure(self.fig_name) plot.scatter(queries, [0 for i in range(queries.shape[0])]) plot.title(self.fig_name) if self.interactive: plot.show() else: plot.savefig(f'{self.path}/{self.fig_name}_{self.iteration:05d}.png') plot.clf()
[docs] @dataclass class LogOracleEvaluator(LogingEvaluator): """ LogOracleEvaluator() | **Description** | This evaluator logs all queries processed by the oracle. """ folder: str = "log" file_name:str = "oracle_data"
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to log all queries added to the oracle process. | Requires the experiment's oracles to be a POracles. :param experiment: The experiment to be evaluated :type experiment: Experiments :raises: TypeError if self.experiment.oracles is not a POracles """ super().register(experiment) if isinstance(self.experiment.oracles, POracles): self.experiment.oracles.process.add = Evaluate(self.experiment.oracles.process.add) self.experiment.oracles.process.add.pre(self.save_query) else: raise TypeError(f"LogOracleEvaluator requires POracles") self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.post(self.log_data) self.queries = None
[docs] def save_query(self, queries): """ save_query(self, queries) -> None | **Description** | Saves the given query with the previously saved queries. :param queries: New queries going to the oracle process :type queries: Tuple[NDArray[Shape["query_nr, ... query_dim"], Number] """ if self.queries is None: self.queries = queries else: self.queries = np.concatenate((self.queries, queries))
[docs] def log_data(self): """ log_data(self) -> None | **Description** | Logs all saved queries to an ```.npy``` file. """ if not self.queries is None: np.save(f'{self.path}/{self.file_name}.npy', self.queries)
[docs] @dataclass class LogStreamEvaluator(LogingEvaluator): """ LogStreamEvaluator() | **Description** | This evaluator logs all data points added to the experiment's data pools stream. """ folder: str = "log" file_name:str = "stream"
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to log all data points added to the data pools stream. | Requires the experiment's data pools to be a StreamDataPools. :param experiment: The experiment to be evaluated :type experiment: Experiments :raises: TypeError if self.experiment.data_pools is not a StreamDataPools """ super().register(experiment) if isinstance(self.experiment.data_pools, StreamDataPools): self.experiment.data_pools.stream.add = Evaluate(self.experiment.data_pools.stream.add) self.experiment.data_pools.stream.add.pre(self.save_stream) else: raise TypeError(f"LogStreamEvaluator requires StreamDataPools") self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.post(self.log_data) self.stream = None
[docs] def save_stream(self, data): """ save_stream(self, data) -> None | **Description** | Saves the given data point with the previously saved data points. :param queries: New data points going to the data pools stream :type queries: Tuple[Tuple[NDArray[Shape["query_nr, ... query_dim"], Number], Tuple[NDArray[Shape["result_nr, ... result_dim"], Number]] """ combined_data = np.concatenate((data[0], data[1]), axis=1) if self.stream is None: self.stream = combined_data else: self.stream = np.concatenate((self.stream, combined_data))
[docs] def log_data(self): """ log_data(self) -> None | **Description** | Logs all saved data points to an ```.npy``` file if there is at least one data point. """ if not self.stream is None: np.save(f'{self.path}/{self.file_name}.npy', self.stream)
[docs] @dataclass class LogProcessEvaluator(LogingEvaluator): """ LogProcessEvaluator() | **Description** | This evaluator logs all data points added to the experiment's data pools process. """ folder: str = "log" file_name:str = "process"
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to log all data points added to the data pools process. | Requires the experiment's data pools to be a ProcessDataPools. :param experiment: The experiment to be evaluated :type experiment: Experiments :raises: TypeError if self.experiment.data_pools is not a ProcessDataPools """ super().register(experiment) if isinstance(self.experiment.data_pools, ProcessDataPools): self.experiment.data_pools.process.add = Evaluate(self.experiment.data_pools.process.add) self.experiment.data_pools.process.add.pre(self.save_process) else: raise TypeError(f"LogProcessEvaluator requires ProcessDataPools") self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.post(self.log_data) self.process = None
[docs] def save_process(self, data): """ save_process(self, data) -> None | **Description** | Saves the given data point with the previously saved data points. :param queries: New data points going to the data pools process :type queries: Tuple[Tuple[NDArray[Shape["query_nr, ... query_dim"], Number], Tuple[NDArray[Shape["result_nr, ... result_dim"], Number]] """ combined_data = np.concatenate((data[0], data[1]), axis=1) if self.process is None: self.process = combined_data else: self.process = np.concatenate((self.process, combined_data))
[docs] def log_data(self): """ log_data(self) -> None | **Description** | Logs all saved data points to an ```.npy``` file if there is at least one data point. """ if not self.process is None: np.save(f'{self.path}/{self.file_name}.npy', self.process)
[docs] @dataclass class LogResultEvaluator(LogingEvaluator): """ LogProcessEvaluator() | **Description** | This evaluator logs all results added to the experiment's data pools results. """ folder: str = "log" file_name:str = "result"
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to log all results added to the data pools results. | Requires the experiment's data pools to be a ResultDataPools. :param experiment: The experiment to be evaluated :type experiment: Experiments :raises: TypeError if self.experiment.data_pools is not a ResultDataPools """ super().register(experiment) if isinstance(self.experiment.data_pools, ResultDataPools): self.experiment.data_pools.result.add = Evaluate(self.experiment.data_pools.result.add) self.experiment.data_pools.result.add.pre(self.save_result) else: raise TypeError(f"LogResultEvaluator requires ResultDataPools") self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.post(self.log_data) self.results = None
[docs] def save_result(self, data): """ save_result(self, data) -> None | **Description** | Saves the given result with the previously saved results. :param queries: New results going to the data pools results :type queries: Tuple[Tuple[NDArray[Shape["query_nr, ... query_dim"], Number], Tuple[NDArray[Shape["result_nr, ... result_dim"], Number]] """ combined_data = np.concatenate((data[0], data[1]), axis=1) if self.results is None: self.results = combined_data else: self.results = np.concatenate((self.results, combined_data))
[docs] def log_data(self): """ log_data(self) -> None | **Description** | Logs all saved results to an ```.npy``` file if there is at least one result. """ if not self.results is None: np.save(f'{self.path}/{self.file_name}.npy', self.results)
[docs] @dataclass class LogAllEvaluator(LogingEvaluator): """ LogAllEvaluator() | **Description** | This evaluator logs combines the LogStreamEvaluator, LogProcessEvaluator and LogResultEvaluator. """ folder: str = "log" file_name:str = "all_data" lsev: LogStreamEvaluator = post_init() lpev: LogProcessEvaluator = post_init() lrev: LogResultEvaluator = post_init()
[docs] def post_init(self): """ post_init(self) -> None | **Description** | Initializes the LogStream-, LogProcess- and LogResult- evaluators """ super().post_init() self.lsev = LogStreamEvaluator(folder=self.folder, file_name=f"{self.file_name}_stream")() self.lpev = LogProcessEvaluator(folder=self.folder, file_name=f"{self.file_name}_process")() self.lrev = LogResultEvaluator(folder=self.folder, file_name=f"{self.file_name}_result")()
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Registers the experiment with all 3 above listed evaluators. """ super().register(experiment) self.lsev.register(experiment = experiment) self.lpev.register(experiment = experiment) self.lrev.register(experiment = experiment)
[docs] @dataclass class LogTVPGTEvaluator(LogingEvaluator): """ LogTVPGTEvaluator() | **Description** | The Log Time Varying Process Ground Truth Evaluator --- """ folder: str = "log" file_name:str = "gt_data"
[docs] def register(self, experiment: Experiment): """ register(self, experiment) -> None | **Description** | Modifies the experiment to log all results coming form the process' update. | Requires the experiment's process to be a DelayedProcess. :param experiment: The experiment to be evaluated :type experiment: Experiments :raises: TypeError if self.experiment.process is not a DelayedProcess """ super().register(experiment) if isinstance(self.experiment.process, DelayedProcess): self.experiment.process.update = Evaluate(self.experiment.process.update) self.experiment.process.update.post(self.save_gt) else: raise TypeError(f"LogTVPGTEvaluator requires DelayedProcess") self.experiment.run = Evaluate(self.experiment.run) self.experiment.run.post(self.log_data) self.gt = None
[docs] def save_gt(self, data): """ save_gt(self, data) -> None | **Description** | Saves the given data (ground truths) points with the previously saved data points. :param queries: New results going to the data pools results :type queries: Tuple[Tuple[NDArray[Shape["query_nr, ... query_dim"], Number], Tuple[NDArray[Shape["result_nr, ... result_dim"], Number]] """ gt_queries, gt_results = data combined_data = np.concatenate((gt_queries, gt_results), axis=1) if self.gt is None: self.gt = combined_data else: self.gt = np.concatenate((self.gt, combined_data))
[docs] def log_data(self): """ log_data(self) -> None | **Description** | Logs all saved data points (ground truths) to an ```.npy``` file if there is at least one data point. """ if not self.gt is None: np.save(f'{self.path}/{self.file_name}.npy', self.gt)