Source code for qmla.shared_functionality.experiment_design_heuristics

import qinfer as qi
import numpy as np
import scipy as sp
import random
import math
import copy
import itertools

import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
from inspect import currentframe, getframeinfo

try:
    from lfig import LatexFigure
except:
    from qmla.shared_functionality.latex_figure import LatexFigure

import qmla.logging


frameinfo = getframeinfo(currentframe())

__all__ = [
    "ExperimentDesignHueristic",
    "MultiParticleGuessHeuristic",
    "MixedMultiParticleLinspaceHeuristic",
    "VolumeAdaptiveParticleGuessHeuristic",
]


def identity(arg):
    return arg


[docs]class ExperimentDesignHueristic(qi.Heuristic): """ Experiment Design Heuristic base class, to be inherited by specific implementations. This object has access to the QInfer Updater and Model objects, so it can, e.g., sample from the particle distribution, to use these values in the design of a new experiment. :param updater: QInfer updater for SMC :type updater: QInfer Updater object :param model_id: ID of model under study, defaults to 1 :type model_id: int :param oplist: list of matrices representing the operators constituting this model, defaults to None :type oplist: list, optional :param norm: type of norm to use, defaults to 'Frobenius' :type norm: str, optional :param inv_field: inversion field to use (legacy - should not matter) defaults to 'x_' :type inv_field: str, optional :param t_field: name of field corresponding to $t$, defaults to 't' :type t_field: str, optional :param maxiters: manimum number of iterations to attempt to find distinct particles from the distribution, defaults to 10 :type maxiters: int, optional :param other_fields: optional further fields, defaults to None :type other_fields: list, optional :param inv_func: inverse function, used by QInfer, (legacy - should not matter) defaults to identity :type inv_func: function, optional :param t_func: function for computing $t$, defaults to identity :type t_func: function, optional :param log_file: path to log file, defaults to 'qmla_log.log' :type log_file: str, optional """ def __init__( self, updater, model_id=1, oplist=None, norm="Frobenius", inv_field="x_", t_field="t", maxiters=10, other_fields=None, inv_func=identity, t_func=identity, log_file="qmla_log.log", **kwargs ): super().__init__(updater) # Most importantly - access to updater and underlying model self._model_id = model_id self._updater = updater self._model = updater.model # Other useful attributes passed self._norm = norm self._x_ = inv_field self._t = t_field self._inv_func = inv_func self._t_func = t_func self._other_fields = other_fields if other_fields is not None else {} self._maxiters = maxiters self._oplist = oplist self._num_experiments = kwargs["num_experiments"] self._figure_format = kwargs["figure_format"] self._log_file = log_file # probe ID self.probe_id = 0 self.probe_rotation_frequency = 5 self.num_probes = kwargs["num_probes"] # storage infrastructure self.heuristic_data = {} # to be stored by model instance self._resample_epochs = [] self._volumes = [] self.effective_sample_size = [] self._times_suggested = [] self._label_fontsize = 10 # consistency when plotting def _get_exp_params_array(self, epoch_id): r"""Return an empty array with a position for every experiment design parameter.""" experiment_params = np.empty((1,), dtype=self._model.expparams_dtype) # fill in particle in expparams particle = self._updater.sample() n_params = particle.shape[1] for i in range(n_params): p = particle[0][i] corresponding_expparam = self._model.modelparam_names[i] experiment_params[corresponding_expparam] = p # choose probe id if epoch_id % self.probe_rotation_frequency == 0: self.probe_id += 1 if self.probe_id >= self.num_probes: self.probe_id = 0 experiment_params["probe_id"] = self.probe_id return experiment_params
[docs] def log_print(self, to_print_list): r"""Wrapper for :func:`~qmla.print_to_log`""" qmla.logging.print_to_log( to_print_list=to_print_list, log_file=self._log_file, log_identifier="Heuristic {}".format( self._model_id ), # TODO add heuristic name )
def __call__(self, **kwargs): """By calling the heuristic, it produces an experiment to be performed to learn upon. :return: all necessary data to perform an experiment, e.g. evolution time and probe ID. :rtype: named tuple """ # Process some data from the model first try: current_volume = kwargs["current_volume"] except: current_volume = None self._volumes.append(current_volume) if self._updater.just_resampled: self._resample_epochs.append(kwargs["epoch_id"] - 1) self.effective_sample_size.append(self._updater.n_ess) # Design a new experiment new_experiment = self.design_experiment(**kwargs) new_time = new_experiment["t"] if new_time > 1e6: # TODO understand cutoff at which time # calculation becomes unstable new_time = np.random.uniform(1e5, 1e6) # self.log_print([ # "Time too high -> randomising to ", new_time # ]) if "force_time_choice" in kwargs: new_time = kwargs["force_time_choice"] self._times_suggested.append(new_time) new_experiment["t"] = new_time return new_experiment
[docs] def design_experiment(self, **kwargs): r""" Design an experiment. Children classes can overwrite this function to implement custom logic for the deisggn of experiments. """ raise RuntimeError("experiment design method not written for this heuristic.")
[docs] def finalise_heuristic(self, **kwargs): r"""Any functionality the user wishes to happen at the final call to the heuristic.""" self.log_print( [ "{} Resample epochs: {}".format( len(self._resample_epochs), self._resample_epochs, ) # "\nTimes suggested:", self._times_suggested ] )
[docs] def plot_heuristic_attributes(self, save_to_file, **kwargs): """ Summarise the heuristic used for the model training through several plots. volume of distribution at each experiment time designed by heuristic for each experiment effecitve sample size at each experiment, used to determine when to resample :param save_to_file: path to which the summary figure is stored :type save_to_file: path """ plots_to_include = ["volume", "times_used", "effective_sample_size"] plt.clf() nrows = len(plots_to_include) lf = LatexFigure(gridspec_layout=(nrows, 1)) if "volume" in plots_to_include: ax = lf.new_axis() self._plot_volumes(ax=ax) ax.legend() if "times_used" in plots_to_include: ax = lf.new_axis() self._plot_suggested_times(ax=ax) ax.legend() if "effective_sample_size" in plots_to_include: ax = lf.new_axis() self._plot_effective_sample_size(ax=ax) ax.legend() # Save figure self.log_print(["LatexFigure has size:", lf.size]) lf.save(save_to_file, file_format=self._figure_format)
def _plot_suggested_times(self, ax, **kwargs): full_epoch_list = range(len(self._times_suggested)) ax.scatter( full_epoch_list, self._times_suggested, label=r"$t \sim k \ \frac{1}{V}$", s=5, ) ax.set_title("Experiment times", fontsize=self._label_fontsize) ax.set_ylabel("Time", fontsize=self._label_fontsize) ax.set_xlabel("Epoch", fontsize=self._label_fontsize) self._add_resample_epochs_to_ax(ax=ax) ax.semilogy() def _plot_volumes(self, ax, **kwargs): full_epoch_list = range(len(self._volumes)) ax.plot( full_epoch_list, self._volumes, label="Volume", ) ax.set_title("Volume", fontsize=self._label_fontsize) ax.set_ylabel("Volume", fontsize=self._label_fontsize) ax.set_xlabel("Epoch", fontsize=self._label_fontsize) self._add_resample_epochs_to_ax(ax=ax) ax.semilogy() def _plot_effective_sample_size(self, ax, **kwargs): full_epoch_list = range(len(self.effective_sample_size)) ax.plot( full_epoch_list, self.effective_sample_size, label=r"$N_{ESS}$", ) resample_thresh = self._updater.resample_thresh ax.axhline( resample_thresh * self.effective_sample_size[0], label="Resample threshold ({}%)".format(resample_thresh * 100), color="grey", ls="-", alpha=0.5, ) if resample_thresh != 0.5: ax.axhline( self.effective_sample_size[0] / 2, label="50%", color="grey", ls="--", alpha=0.5, ) ax.set_title("Effective Sample Size", fontsize=self._label_fontsize) ax.set_ylabel("$N_{ESS}$", fontsize=self._label_fontsize) ax.set_xlabel("Epoch", fontsize=self._label_fontsize) self._add_resample_epochs_to_ax(ax=ax) ax.legend() ax.set_ylim(0, self.effective_sample_size[0] * 1.1) # ax.semilogy() def _add_resample_epochs_to_ax(self, ax, **kwargs): c = "grey" a = 0.5 ls = ":" if len(self._resample_epochs) > 0: ax.axvline( self._resample_epochs[0], ls=ls, color=c, label="Resample", alpha=a ) for e in self._resample_epochs[1:]: ax.axvline(e, ls=ls, color=c, alpha=a)
class MultiParticleGuessHeuristic(ExperimentDesignHueristic): def __init__(self, **kwargs): super().__init__(**kwargs) self.log_print(["Particle Guess Heuristic"]) def design_experiment(self, epoch_id=0, **kwargs): idx_iter = 0 while idx_iter < self._maxiters: sample = self._updater.sample(n=2) x, xp = sample[:, np.newaxis, :] if self._model.distance(x, xp) > 0: break else: idx_iter += 1 if self._model.distance(x, xp) == 0: self.log_print(["x,xp={},{}".format(x, xp)]) raise RuntimeError( "PGH did not find distinct particles in \ {} iterations.".format( self._maxiters ) ) d = self._model.distance(x, xp) new_time = 1 / d eps = self._get_exp_params_array(epoch_id=epoch_id) eps["t"] = new_time # get sample from x particle = self._updater.sample() # self.log_print(["Particle for IQLE=", particle]) n_params = particle.shape[1] for i in range(n_params): p = particle[0][i] corresponding_expparam = self._model.modelparam_names[i] eps[corresponding_expparam] = p return eps class MixedMultiParticleLinspaceHeuristic(ExperimentDesignHueristic): r""" First half of experiments are standard MPGH, then force times evenly spaced between 0 and max_time. """ def __init__(self, **kwargs): super().__init__(**kwargs) self.log_print(["Mixed Particle Guess Heuristic"]) self.max_time_to_enforce = kwargs["max_time_to_enforce"] self.count_number_high_times_suggested = 0 self.num_epochs_for_first_phase = self._num_experiments / 2 # generate a list of times of length Ne/2 # evenly spaced between 0, max_time (from exploration_strategy) # then every t in that list is learned upon once. # Higher Ne means finer granularity # times are leared in a random order (from random.shuffle below) num_epochs_to_space_time_list = math.ceil( self._num_experiments - self.num_epochs_for_first_phase ) t_list = list( np.linspace(0, self.max_time_to_enforce, num_epochs_to_space_time_list + 1) ) t_list.remove(0) # dont want to waste an epoch on t=0 t_list = [np.round(t, 2) for t in t_list] # random.shuffle(t_list) self._time_list = itertools.cycle(t_list) def design_experiment(self, epoch_id=0, **kwargs): idx_iter = 0 while idx_iter < self._maxiters: x, xp = self._updater.sample(n=2)[:, np.newaxis, :] if self._model.distance(x, xp) > 0: break else: idx_iter += 1 if self._model.distance(x, xp) == 0: raise RuntimeError( "PGH did not find distinct particles in \ {} iterations.".format( self._maxiters ) ) eps = self._get_exp_params_array(epoch_id=epoch_id) if epoch_id < self.num_epochs_for_first_phase: d = self._model.distance(x, xp) new_time = self._t_func(1 / d) else: new_time = next(self._time_list) if new_time > self.max_time_to_enforce: self.count_number_high_times_suggested += 1 if epoch_id == self._num_experiments - 1: self.log_print( [ "Number of suggested t > t_max:", self.count_number_high_times_suggested, ] ) eps["t"] = new_time return eps class SampleOrderMagnitude(ExperimentDesignHueristic): def __init__(self, updater, **kwargs): super().__init__(updater, **kwargs) self.count_order_of_magnitudes = {} self.force_order = None def design_experiment(self, epoch_id=0, **kwargs): experiment = self._get_exp_params_array( epoch_id=epoch_id ) # empty experiment array # sample from updater idx_iter = 0 while idx_iter < self._maxiters: x, xp = self._updater.sample(n=2)[:, np.newaxis, :] if self._model.distance(x, xp) > 0: break else: idx_iter += 1 cov_mtx = self._updater.est_covariance_mtx() orders_of_magnitude = np.log10( np.sqrt(np.abs(np.diag(cov_mtx))) ) # of the uncertainty on the individual parameters orders_of_magnitude[orders_of_magnitude < 1] = 1 # lower bound probs_of_orders = [ i / sum(orders_of_magnitude) for i in orders_of_magnitude ] # weight of each order of magnitude # sample from the present orders of magnitude selected_order = np.random.choice(a=orders_of_magnitude, p=probs_of_orders) if self.force_order is not None: selected_order = self.force_order try: self.count_order_of_magnitudes[np.round(selected_order)] += 1 except: self.count_order_of_magnitudes[np.round(selected_order)] = 1 idx_params_of_similar_uncertainty = np.where( np.isclose(orders_of_magnitude, selected_order, atol=1) ) # within 1 order of magnitude of the max # change the scaling matrix used to calculate the distance # to place importance only on the sampled order of magnitude self._model._Q = np.zeros(len(orders_of_magnitude)) for idx in idx_params_of_similar_uncertainty: self._model._Q[idx] = 1 d = self._model.distance(x, xp) new_time = 1 / d if self.force_order == 9: new_time *= 100 experiment[self._t] = new_time # print("Available orders of magnitude:", orders_of_magnitude) # print("Selected order = ", selected_order) # print("x= {}".format(x)) # print("x'={}".format(xp)) # print("Distance = ", d) # print("Distance order mag=", np.log10(d)) # print("=> time=", new_time) return experiment def finalise_heuristic(self): super().finalise_heuristic() self.log_print(["count_order_of_magnitudes:", self.count_order_of_magnitudes]) class SampledUncertaintyWithConvergenceThreshold(ExperimentDesignHueristic): def __init__(self, updater, **kwargs): super().__init__(updater, **kwargs) self._qinfer_model = self._model cov_mtx = self._updater.est_covariance_mtx() self.initial_uncertainties = np.sqrt(np.abs(np.diag(cov_mtx))) self.track_param_uncertainties = np.zeros(self._qinfer_model.n_modelparams) self.selection_criteria = ( "relative_volume_decrease" # 'hard_code_6' # 'hard_code_6_9_magnitudes' ) self.count_order_of_magnitudes = {} self.all_count_order_of_magnitudes = {} self.counter_productive_experiments = 0 self.call_counter = 0 self._num_experiments = kwargs["num_experiments"] self._num_exp_to_switch_magnitude = self._num_experiments / 2 print("Heuristic - num experiments = ", self._num_experiments) print("epoch to switch target at:", self._num_exp_to_switch_magnitude) def design_experiment(self, epoch_id=0, **kwargs): self.call_counter += 1 experiment = self._get_exp_params_array( epoch_id=epoch_id ) # empty experiment array # sample from updater idx_iter = 0 while idx_iter < self._maxiters: x, xp = self._updater.sample(n=2)[:, np.newaxis, :] if self._model.distance(x, xp) > 0: break else: idx_iter += 1 current_param_est = self._updater.est_mean() cov_mtx = self._updater.est_covariance_mtx() param_uncertainties = np.sqrt( np.abs(np.diag(cov_mtx)) ) # uncertainty of params individually orders_of_magnitude = np.log10( param_uncertainties ) # of the uncertainty on the individual parameters param_order_mag = np.log10(current_param_est) relative_order_magnitude = param_order_mag / max(param_order_mag) weighting_by_relative_order_magnitude = 10 ** relative_order_magnitude self.track_param_uncertainties = np.vstack( (self.track_param_uncertainties, param_uncertainties) ) if self.selection_criteria.startswith("hard_code"): if self.selection_criteria == "hard_code_6_9_magnitudes": if self.call_counter > self._num_exp_to_switch_magnitude: order_to_target = 6 else: order_to_target = 9 elif self.selection_criteria == "hard_code_6": order_to_target = 6 locations = np.where( np.isclose(orders_of_magnitude, order_to_target, atol=1) ) weights = np.zeros(len(orders_of_magnitude)) weights[locations] = 1 probability_of_param = weights / sum(weights) elif self.selection_criteria == "relative_volume_decrease": # probability of choosing order of magnitude # of each parameter based on the ratio # (change in volume)/(current estimate) # for that parameter print("Sampling by delta uncertainty/ estimate") change_in_uncertainty = np.diff( self.track_param_uncertainties[-2:], # most recent two track-params axis=0, )[0] print("change in uncertainty=", change_in_uncertainty) if np.all(change_in_uncertainty < 0): # TODO better way to deal with all increasing uncertainties print("All parameter uncertainties increased") self.counter_productive_experiments += 1 weights = 1 / np.abs(change_in_uncertainty) else: # disregard changes which INCREASE volume: change_in_uncertainty[change_in_uncertainty < 0] = 0 # weight = ratio of how much that change has decreased the volume # over the current best estimate of the parameter weights = change_in_uncertainty / current_param_est weights *= weighting_by_relative_order_magnitude # weight the likelihood of selecting a parameter by its order of magnitude probability_of_param = weights / sum(weights) elif self.selection_criteria == "order_of_magniutde": # probability directly from order of magnitude print("Sampling by order magnitude") probability_of_param = np.array(orders_of_magnitude) / sum( orders_of_magnitude ) else: # sample evenly print("Sampling evenly") probability_of_param = np.ones(self._qinfer_model.n_modelparams) # sample from the present orders of magnitude selected_order = np.random.choice(a=orders_of_magnitude, p=probability_of_param) try: self.count_order_of_magnitudes[np.round(selected_order)] += 1 self.all_count_order_of_magnitudes[np.round(selected_order)] += 1 except: self.count_order_of_magnitudes[np.round(selected_order)] = 1 self.all_count_order_of_magnitudes[np.round(selected_order)] = 1 idx_params_of_similar_uncertainty = np.where( np.isclose(orders_of_magnitude, selected_order, atol=1) ) # within 1 order of magnitude of the max self._model._Q = np.zeros(len(orders_of_magnitude)) for idx in idx_params_of_similar_uncertainty: self._qinfer_model._Q[idx] = 1 d = self._qinfer_model.distance(x, xp) new_time = 1 / d experiment[self._t] = new_time print("Current param estimates:", current_param_est) try: print("Weights:", weights) except: pass print("probability_of_param: ", probability_of_param) print("orders_of_magnitude:", orders_of_magnitude) print("Selected order = ", selected_order) print("x={}".format(x)) print("xp={}".format(xp)) print("Distance = ", d) print("Distance order mag=", np.log10(d)) print("=> time=", new_time) return experiment class VolumeAdaptiveParticleGuessHeuristic(ExperimentDesignHueristic): def __init__(self, updater, **kwargs): super().__init__(updater, **kwargs) self.time_multiplicative_factor = 1 self.derivative_frequency = self._num_experiments / 20 self.burn_in_learning_time = 6 * self.derivative_frequency self.log_print( [ "Derivative freq:{} \t burn in:{}".format( self.derivative_frequency, self.burn_in_learning_time ) ] ) self.time_factor_boost = 10 # factor to increase/decrease by self.derivatives = {1: {}, 2: {}} self.time_factor_changes = {"decreasing": [], "increasing": []} self.distances = [] distance_metrics = [ "cityblock", "euclidean", "chebyshev", "canberra", "braycurtis", "minkowski", ] self.designed_times = {m: {} for m in distance_metrics} self.distance_metric_to_use = "euclidean" def design_experiment(self, epoch_id=0, **kwargs): # Maybe increase multiplicative factor for time chosen later if ( epoch_id % self.derivative_frequency == 0 and epoch_id > self.burn_in_learning_time ): current_volume = self._volumes[-1] previous_epoch_to_compare = int(-1 - self.derivative_frequency) try: previous_volume = self._volumes[previous_epoch_to_compare] except: previous_volume = self._volumes[0] self.log_print( [ "Couldn't find {}th element of volumes: {}".format( previous_epoch_to_compare, self._volumes ) ] ) relative_change = 1 - current_volume / previous_volume self.log_print( [ "At epoch {} V_old/V_new={}/{}. relative change={}".format( epoch_id, np.round(previous_volume, 2), np.round(current_volume, 2), relative_change, ) ] ) expected_change = 0.2 # N% decrease in volume after N experiments (?) if 0 < relative_change <= expected_change: self.time_multiplicative_factor *= self.time_factor_boost print( "Epoch {} r={}. Increasing time factor: {}".format( epoch_id, relative_change, self.time_multiplicative_factor ) ) self.time_factor_changes["increasing"].append(epoch_id) elif relative_change < -0.1: self.time_multiplicative_factor /= ( self.time_factor_boost ) # volume increasing -> use lower times print( "Epoch {} r={}. Decreasing time factor: {}".format( epoch_id, relative_change, self.time_multiplicative_factor ) ) self.time_factor_changes["decreasing"].append(epoch_id) elif relative_change > 0.1: self.time_multiplicative_factor *= 1 # learning well enough print( "Epoch {} r={}. Maintaining time factor: {}".format( epoch_id, relative_change, self.time_multiplicative_factor ) ) # Select particles idx_iter = 0 while idx_iter < self._maxiters: sample = self._updater.sample(n=2) x, xp = sample[:, np.newaxis, :] if self._model.distance(x, xp) > 0: break else: idx_iter += 1 if self._model.distance(x, xp) == 0: raise RuntimeError( "PGH did not find distinct particles in \ {} iterations.".format( self._maxiters ) ) for method in self.designed_times: d = sp.spatial.distance.pdist(sample, metric=method) self.designed_times[method][epoch_id] = 1 / d eps = self._get_exp_params_array(epoch_id=epoch_id) new_time = copy.copy(self.designed_times[self.distance_metric_to_use][epoch_id]) new_time *= self.time_multiplicative_factor eps["t"] = new_time return eps def plot_heuristic_attributes(self, save_to_file, **kwargs): """Plot results related to the experiment design heuristic. Plots times generated by the heuristic; metric distances: distance between the two sampled particles under various measures; volume of the parameter distribution at each experiment. :param save_to_file: path to which to save the resultant figure. :type save_to_file: path """ plots_to_include = [ "volume", "metric_distances", "times_used", #'derivatives', ] plt.clf() nrows = len(plots_to_include) lf = LatexFigure(gridspec_layout=(nrows, 1)) # Volume if "volume" in plots_to_include: ax = lf.new_axis() self._plot_volumes(ax=ax) self.add_time_factor_change_points_to_ax(ax=ax) ax.legend() if "times_used" in plots_to_include: ax = lf.new_axis() self._plot_suggested_times(ax=ax) self.add_time_factor_change_points_to_ax(ax=ax) ax.legend() if "derivatives" in plots_to_include: # Volume Derivatives ax = lf.new_axis() ## first derivatives derivs = self.derivatives[1] epochs = sorted(derivs.keys()) first_derivatives = [derivs[e] if e in derivs else None for e in epochs] ax.plot( epochs, first_derivatives, label=r"$\frac{dV}{dE}$", color="darkblue", marker="x", ) ## second derivatives derivs = self.derivatives[2] epochs = sorted(derivs.keys()) second_derivatives = [derivs[e] if e in derivs else None for e in epochs] ax.plot( epochs, second_derivatives, label=r"$\frac{d^2V}{dE^2}$", color="maroon", marker="+", ) ax.axhline(0, ls="--", alpha=0.3) ax.legend(loc="upper right") ax.set_yscale("symlog") ax.set_ylabel("Derivatives", fontsize=self._label_fontsize) ax.set_xlabel("Epoch", fontsize=self._label_fontsize) if len(self.time_factor_changes["decreasing"]) > 0: ax.axvline( self.time_factor_changes["decreasing"][0], ls="--", color="red", label="decrease k", alpha=0.5, ) for e in self.time_factor_changes["decreasing"][1:]: ax.axvline(e, ls="--", color="red", alpha=0.5) if len(self.time_factor_changes["increasing"]) > 0: ax.axvline( self.time_factor_changes["increasing"][0], ls="--", color="green", label="increase k", alpha=0.5, ) for e in self.time_factor_changes["increasing"][1:]: ax.axvline(e, ls="--", color="green", alpha=0.5) ax.legend(loc="lower right") if "metric_distances" in plots_to_include: # Times by distance metrics ax = lf.new_axis() linestyles = itertools.cycle(["--", ":", "-."]) for method in self.designed_times: times_of_method = self.designed_times[method] epochs = sorted(times_of_method.keys()) time_by_epoch = [times_of_method[e] for e in epochs] lb = method if self.distance_metric_to_use == method: ls = "-" lb += " (used)" alpha = 1 else: ls = next(linestyles) alpha = 0.75 ax.plot(epochs, time_by_epoch, label=lb, ls=ls, alpha=alpha) ax.legend(title="Distance metric") ax.set_title( "Raw time chosen by distance metrics", fontsize=self._label_fontsize ) ax.set_ylabel("Time", fontsize=self._label_fontsize) ax.set_xlabel("Epoch", fontsize=self._label_fontsize) ax.semilogy() # Save figure self.log_print(["LatexFigure has size:", lf.size]) lf.save(save_to_file, file_format=self._figure_format) def add_time_factor_change_points_to_ax(self, ax): if len(self.time_factor_changes["decreasing"]) > 0: ax.axvline( self.time_factor_changes["decreasing"][0], ls="--", color="red", label=r"$k \rightarrow k / {}$".format( np.round(self.time_factor_boost, 2) ), alpha=0.5, ) for e in self.time_factor_changes["decreasing"][1:]: ax.axvline(e, ls="--", color="red", alpha=0.5) if len(self.time_factor_changes["increasing"]) > 0: ax.axvline( self.time_factor_changes["increasing"][0], ls="--", color="green", # label='increase k', label=r"$ k \rightarrow k \times {}$".format( np.round(self.time_factor_boost, 2) ), alpha=0.5, ) for e in self.time_factor_changes["increasing"][1:]: ax.axvline(e, ls="--", color="green", alpha=0.5) class FixedNineEighthsToPowerK(ExperimentDesignHueristic): def __init__(self, **kwargs): super().__init__(**kwargs) self._k = -25 def design_experiment(self, epoch_id=0, **kwargs): if epoch_id % 10 == 0: # don't want to increment at every single experiment when there are a lot to do. # b/c t becomes far too big self._k += 1 new_time = (9 / 8) ** self._k eps = self._get_exp_params_array(epoch_id=epoch_id) eps["t"] = new_time # get sample from x # particle = self._updater.sample() # n_params = particle.shape[1] # for i in range(n_params): # p = particle[0][i] # corresponding_expparam = self._model.modelparam_names[i] # eps[corresponding_expparam] = p return eps class RandomTimeUpperBounded(ExperimentDesignHueristic): def __init__(self, **kwargs): super().__init__(**kwargs) self.max_time_to_enforce = kwargs["max_time_to_enforce"] def design_experiment(self, epoch_id=0, **kwargs): new_time = random.uniform(0, self.max_time_to_enforce) # new_time = self._max_time eps = self._get_exp_params_array(epoch_id=epoch_id) eps["t"] = new_time # get sample from x particle = self._updater.sample() n_params = particle.shape[1] for i in range(n_params): p = particle[0][i] corresponding_expparam = self._model.modelparam_names[i] eps[corresponding_expparam] = p return eps class FixedTimeTest(ExperimentDesignHueristic): def __init__(self, **kwargs): super().__init__(**kwargs) def design_experiment(self, epoch_id=0, **kwargs): new_time = 5 # new_time = self._max_time eps = self._get_exp_params_array(epoch_id=epoch_id) eps["t"] = new_time # get sample from x particle = self._updater.sample() n_params = particle.shape[1] for i in range(n_params): p = particle[0][i] corresponding_expparam = self._model.modelparam_names[i] eps[corresponding_expparam] = p return eps class TimeList(ExperimentDesignHueristic): def __init__(self, **kwargs): super().__init__(**kwargs) self.max_time_to_enforce = kwargs["max_time_to_enforce"] min_t = self.max_time_to_enforce / self._num_experiments delta_t = ( 2 * min_t ) # effectively how many iterations each time is eventually learned for time_list = np.arange(min_t, self.max_time_to_enforce, delta_t) self.log_print(["delta t for heuristic:", delta_t]) self.time_list = itertools.cycle(time_list) def design_experiment(self, epoch_id=0, **kwargs): new_time = next(self.time_list) eps = self._get_exp_params_array(epoch_id=epoch_id) eps["t"] = new_time return eps