from __future__ import absolute_import
from __future__ import print_function
import math
import numpy as np
import os as os
import sys as sys
import itertools
import pandas as pd
import time
from time import sleep
import random
import logging
import matplotlib.pyplot as plt
from matplotlib.gridspec import GridSpec
import pickle
import redis
import rq
import seaborn as sns
try:
from lfig import LatexFigure
except:
from qmla.shared_functionality.latex_figure import LatexFigure
# QMLA functionality
import qmla.analysis
import qmla.model_building_utilities as model_building_utilities
import qmla.get_exploration_strategy as get_exploration_strategy
import qmla.redis_settings as rds
import qmla.model_for_storage
from qmla.remote_bayes_factor import remote_bayes_factor_calculation
from qmla.remote_model_learning import remote_learn_model_parameters
import qmla.exploration_tree
import qmla.utilities
pickle.HIGHEST_PROTOCOL = 4
plt.switch_backend("agg")
__all__ = ["QuantumModelLearningAgent"]
[docs]class QuantumModelLearningAgent:
r"""
QMLA manager class.
Controls the infrastructure which determines which models are learned and compared.
By interpreting user defined :class:`~qmla.exploration_strategies.ExplorationStrategy`,
grows :class:`~qmla.ExplorationTree` objects which hold numerous models
on :class:`~qmla.BranchQMLA` objects.
All models on branches are learned and then compared.
The comparisons on a branch inform the next set of models generated on that tree.
First calls a series of setup functions to implement
infrastructure used throughout.
The available algorithms, and their corresponding methods, are:
- Quantum Hamilontian Learning:
:meth:`~qmla.QuantumModelLearningAgent.run_quantum_hamiltonian_learning`
- Quantum Hamilontian Learning multiple models:
:meth:`~qmla.QuantumModelLearningAgent.run_quantum_hamiltonian_learning_multiple_models`
- Quantum Model Learning Agent:
:meth:`~qmla.QuantumModelLearningAgent.run_complete_qmla`
:param ControlsQMLA qmla_controls: Storage for configuration of a QMLA instance.
:param dict model_priors: values of means/widths to enfore on given models,
specifically for further_qhl mode.
:param dict experimental_measurements: expectation values by time of the
underlying true/target model.
"""
def __init__(
self,
qmla_controls=None,
model_priors=None,
experimental_measurements=None,
**kwargs
):
self._start_time = time.time() # to measure run-time
# Configure this QMLA instance
if qmla_controls is None:
self.qmla_controls = qmla.controls_qmla.parse_cmd_line_args(args={})
else:
self.qmla_controls = qmla_controls
self.exploration_class = self.qmla_controls.exploration_class
# Basic settings, path definitions etc
self._fundamental_settings()
# Info on true model
self._true_model_definition()
# Parameters related to learning/comparing models
self._set_learning_and_comparison_parameters(
model_priors=model_priors,
experimental_measurements=experimental_measurements,
)
# Resources potentially reallocated
self._compute_base_resources()
# Redundant attributes, retained for legacy; to be removed
self._potentially_redundant_setup()
# Check if QMLA should run in parallel and set up accordingly
self._setup_parallel_requirements()
# QMLA core info stored on redis server
self._compile_and_store_qmla_info_summary()
# Set up infrastructure related to exploration strategies and tree management
self._setup_tree_and_exploration_strategies()
##########
# Section: Initialisation and setup
##########
[docs] def _fundamental_settings(self):
r"""Basic settings, path definitions etc."""
# Extract info from Controls
self.qmla_id = self.qmla_controls.qmla_id
self.redis_host_name = self.qmla_controls.host_name
self.redis_port_number = self.qmla_controls.port_number
self.log_file = self.qmla_controls.log_file
self.log_print(
[
"\nwithin QMLA, ES's qmla id is {}. True model={}".format(
self.exploration_class.qmla_id, self.exploration_class.true_model
)
]
)
self.qhl_mode = self.qmla_controls.qhl_mode
self.qhl_mode_multiple_models = self.qmla_controls.qhl_mode_multiple_models
self.latex_name_map_file_path = self.qmla_controls.latex_mapping_file
self.results_directory = self.qmla_controls.results_directory
self.debug_mode = self.qmla_controls.debug_mode
self.plot_level = self.qmla_controls.plot_level
# Databases for storing learning/comparison data
self.redis_databases = rds.get_redis_databases_by_qmla_id(
self.redis_host_name,
self.redis_port_number,
self.qmla_id,
)
self.redis_databases["any_job_failed"].set("Status", 0)
# Logistics
self.models_learned = []
self.timings = {
# track times spent in some subroutines
"inspect_job_crashes": 0,
"jobs_finished": 0,
}
self.call_counter = {
# track number of calls to some subroutines
"job_crashes": 0,
"jobs_finished": 0,
}
self.sleep_duration = 2
[docs] def _true_model_definition(self):
r"""Information related to true (target) model."""
self.true_model_constructor = self.exploration_class.true_model_constructor
self.true_model_name = self.true_model_constructor.name
self.true_model_dimension = self.true_model_constructor.num_qubits
self.true_model_constituent_operators = (
self.true_model_constructor.terms_matrices
)
self.true_model_num_params = self.true_model_constructor.num_terms
# self.true_model_constituent_terms_latex = [
# self.exploration_class.latex_name(term)
# for term in
# self.true_model_constructor.terms_names
# ]
self.true_model_constituent_terms_latex = (
self.true_model_constructor.terms_names_latex
)
self.true_param_list = self.exploration_class.true_params_list
self.true_param_dict = self.exploration_class.true_params_dict
self.true_model_branch = -1 # overwrite if true model is added to database
self.true_model_considered = False
self.true_model_found = False
self.true_model_id = -1
self.true_model_on_branhces = []
self.true_model_hamiltonian = self.exploration_class.true_hamiltonian
self.log_print(["True model:", self.true_model_name])
[docs] def _setup_tree_and_exploration_strategies(
self,
):
r"""Set up infrastructure."""
self.model_database = pd.DataFrame(
{
"model_id": [],
"model_name": [],
"latex_name": [],
"branch_id": [],
"f_score": [],
"model_storage_instance": [],
"model_constructor": [],
"branches_present_on": [],
"terms": [],
"latex_terms": [],
}
)
self.model_lists = {
# assumes maxmium 13 qubit-models considered
# to be checked when checking model_lists
# TODO generalise to max dim of Exploration Strategy
j: []
for j in range(1, 13)
}
self.all_bayes_factors = {}
self.bayes_factor_pair_computed = []
# Exploration Strategy setup
self.exploration_strategy_of_true_model = self.qmla_controls.exploration_rules
self.unique_exploration_strategy_instances = (
self.qmla_controls.unique_exploration_strategy_instances
)
# Keep track of models/branches
self.model_count = 0
self.highest_model_id = 0 # so first created model gets model_id=0
self.models_branches = {}
self.branch_highest_id = 0
self.model_name_id_map = {}
self.ghost_branches = {}
# Tree object for each exploration strategy
self.trees = {
gen: qmla.exploration_tree.ExplorationTree(
exploration_class=self.unique_exploration_strategy_instances[gen]
)
for gen in self.unique_exploration_strategy_instances
}
self.branches = {}
self.tree_count = len(self.trees)
self.tree_count_completed = np.sum(
[tree.is_tree_complete() for tree in self.trees.values()]
)
[docs] def _set_learning_and_comparison_parameters(
self,
model_priors,
experimental_measurements,
):
r"""Parameters related to learning/comparing models."""
# Miscellaneous
self.model_priors = model_priors
# Learning parameters, used by QInfer updates
self.num_particles = self.qmla_controls.num_particles
self.num_experiments = self.qmla_controls.num_experiments
# self.fraction_experiments_for_bf = self.exploration_class.fraction_experiments_for_bf
self.num_experiments_for_bayes_updates = self.num_experiments # TODO remove
self.bayes_threshold_lower = 1
self.bayes_threshold_upper = 100 # TODO get from ES
# Analysis infrastructure
self.model_f_scores = {}
self.model_precisions = {}
self.model_sensitivities = {}
self.bayes_factors_df = pd.DataFrame()
# Get probes used for learning
self.exploration_class.generate_probes(
# noise_level=self.exploration_class.probe_noise_level,
# minimum_tolerable_noise=0.0,
# tell it the max number of qubits required by any ES under consideration
probe_maximum_number_qubits=max(
[
gr.max_num_probe_qubits
for gr in self.qmla_controls.unique_exploration_strategy_instances.values()
]
)
)
self.probes_system = self.exploration_class.probes_system
self.probes_simulator = self.exploration_class.probes_simulator
self.probe_number = self.exploration_class.num_probes
sim_probe_keys = list(self.probes_simulator.keys())
self.log_print(
[
"Simulator probe keys (len {}):{}".format(
len(sim_probe_keys), sim_probe_keys
)
]
)
# Measurements of true model
self.experimental_measurements = experimental_measurements
self.experimental_measurement_times = sorted(
list(self.experimental_measurements.keys())
)
# Used for consistent plotting
self.times_to_plot = self.experimental_measurement_times
self.times_to_plot_reduced_set = self.times_to_plot[0::10]
self.probes_plot_file = self.qmla_controls.probes_plot_file
try:
self.probes_for_plots = pickle.load(open(self.probes_plot_file, "rb"))
except BaseException:
self.log_print(
["Could not load plot probes from {}".format(self.probes_plot_file)]
)
[docs] def _potentially_redundant_setup(
self,
):
r"""
Graveyard for deprecated ifnrastructure.
Attributes etc stored here which are not functionally used
within QMLA, but which are called somewhere,
and cause errors when omitted.
Should be stored here temporarily during development,
and removed entirely when sure they are not needed.
"""
# Some functionality towards time dependent models
self.use_time_dependent_true_model = False
self.num_time_dependent_true_params = 0
self.time_dependent_params = None
# Plotting data about pairwise comparisons
self.instance_learning_and_comparisons_path = os.path.join(
self.qmla_controls.plots_directory, "comparisons"
)
if not os.path.exists(self.instance_learning_and_comparisons_path):
try:
os.makedirs(self.instance_learning_and_comparisons_path)
except BaseException:
# reached at exact same time as another process; don't crash
pass
self.bayes_factors_store_times_file = str(
self.instance_learning_and_comparisons_path
+ "BayesFactorsPairsTimes_"
+ str(self.qmla_controls.long_id)
+ ".txt"
)
[docs] def _setup_parallel_requirements(self):
r"""Infrastructure for use when QMLA run in parallel."""
self.use_rq = self.qmla_controls.use_rq
self.rq_timeout = self.qmla_controls.rq_timeout
self.rq_log_file = self.log_file
# writeable file object to use for logging:
self.write_log_file = open(self.log_file, "a")
try:
self.redis_conn = redis.Redis(
host=self.redis_host_name, port=self.redis_port_number
)
parallel_enabled = True
except BaseException:
self.log_print("Importing rq failed: enforcing serial.")
parallel_enabled = False
self.run_in_parallel = parallel_enabled
[docs] def _compute_base_resources(self):
r"""
Compute the set of minimal resources for models to learn on.
In the case self.reallocate_resources==True,
models will receive resources (epochs, particles)
scaled by how complicated they are.
For instance, models with 4 parameters will receive
twice as many particles as a model with
2 parameters.
"""
# Decide if reallocating resources based on true ES.
if self.exploration_class.reallocate_resources:
base_num_qubits = 3
base_num_terms = 3
for op in self.exploration_class.initial_models:
if model_building_utilities.get_num_qubits(op) < base_num_qubits:
base_num_qubits = model_building_utilities.get_num_qubits(op)
num_terms = len(
model_building_utilities.get_constituent_names_from_name(op)
)
if num_terms < base_num_terms:
base_num_terms = num_terms
self.base_resources = {
"num_qubits": base_num_qubits,
"num_terms": base_num_terms,
"reallocate": True,
}
else:
self.base_resources = {"num_qubits": 1, "num_terms": 1, "reallocate": False}
[docs] def _compile_and_store_qmla_info_summary(self):
r"""
Gather info needed to run QMLA tasks and store remotely.
QMLA issues jobs to run remotely, namely for model (parameter)
learning and model comparisons (Bayes factors).
These jobs don't need access to all QMLA data, but do need
some common info, e.g. number of particles and epochs.
This function gathers all relevant information in a single dict,
and stores it on the redis server which all worker nodes have access to.
It also stores the probe sets required for the same tasks.
"""
number_hamiltonians_to_exponentiate = self.num_particles * (
2 * self.num_experiments
)
self.latex_config = str(
"$P_{"
+ str(self.num_particles)
+ "}E_{"
+ str(self.num_experiments)
+
# '}B_{' + str(self.num_experiments_for_bayes_updates) +
"}H_{"
+ str(number_hamiltonians_to_exponentiate)
+ r"}|\psi>_{"
+ str(self.probe_number)
+ "}PN_{"
+ str(self.exploration_class.probe_noise_level)
+ "}$"
)
self.qmla_settings = {
"probes_plot_file": self.probes_plot_file,
"plot_times": self.times_to_plot,
"true_name": self.true_model_name,
"true_oplist": self.true_model_constituent_operators,
"true_model_terms_params": self.true_param_list,
"true_param_dict": self.true_param_dict,
"true_model_constructor": self.true_model_constructor,
"num_particles": self.num_particles,
"num_experiments": self.num_experiments,
"results_directory": self.results_directory,
"plots_directory": self.qmla_controls.plots_directory,
"debug_mode": self.debug_mode,
"plot_level": self.plot_level,
"figure_format": self.qmla_controls.figure_format,
"long_id": self.qmla_controls.long_id,
"model_priors": self.model_priors, # could be path to unpickle within model?
"experimental_measurements": self.experimental_measurements,
"base_resources": self.base_resources,
"store_particles_weights": False, # TODO from exploration strategy or unneeded
"qhl_plots": False, # TODO get from exploration strategy
"experimental_measurement_times": self.experimental_measurement_times,
"num_probes": self.probe_number, # from exploration strategy or unneeded,
"run_info_file": self.qmla_controls.run_info_file,
}
self.log_print(
["QMLA settings figure_format:", self.qmla_settings["figure_format"]]
)
# Store qmla_settings and probe dictionaries on the redis database,
# accessible by all workers.
# These are retrieved by workers to set
# parameters to use when learning/comparing models.
compressed_qmla_core_info = pickle.dumps(self.qmla_settings, protocol=4)
compressed_probe_dict = pickle.dumps(self.probes_system, protocol=4)
compressed_sim_probe_dict = pickle.dumps(self.probes_simulator, protocol=4)
qmla_core_info_database = self.redis_databases["qmla_core_info_database"]
qmla_core_info_database.set("qmla_settings", compressed_qmla_core_info)
qmla_core_info_database.set("probes_system", compressed_probe_dict)
qmla_core_info_database.set("probes_simulator", compressed_sim_probe_dict)
self.qmla_core_info_database = {
"qmla_settings": self.qmla_settings,
"probes_system": self.probes_system,
"probes_simulator": self.probes_simulator,
}
self.log_print(["Saved QMLA instance info to ", qmla_core_info_database])
##########
# Section: Calculation of models parameters and Bayes factors
##########
[docs] def learn_models_on_given_branch(self, branch_id, blocking=False):
r"""
Launches jobs to learn all models on the specified branch.
Models which are on the branch but have already been learned are not re-learned.
For each remaining model on the branch,
:meth:`~qmla.QuantumModelLearningAgent.learn_model` is called.
The branch is added to the redis database `active_branches_learning_models`,
indicating that branch_id has currently got models in the learning phase.
This redis database is monitored by the :meth:`~qmla.QuantumModelLearningAgent.learn_models_until_trees_complete`.
When all models registered on the branch have completed, it is recorded, allowing QMLA
to perform the next stage: either spawning a new branch from this branch, or
continuing to the final stage of QMLA.
This method can block, meaning it waits for a model's learning to complete
before proceeding. If in parallel, do not block as model learning
won't be launched until the previous model has completed.
:param int branch_id: unique QMLA branch ID to learn models of.
:param bool use_rq: whether to implement learning via RQ workers.
Argument only used when passed to :meth:`QuantumModelLearningAgent.learn_model`.
:param bool blocking: whether to wait on all models' learning before proceeding.
"""
model_list = self.branches[branch_id].resident_models
num_models_already_set_this_branch = self.branches[
branch_id
].num_precomputed_models
unlearned_models_this_branch = self.branches[branch_id].unlearned_models
# Update redis database
active_branches_learning_models = self.redis_databases[
"active_branches_learning_models"
]
active_branches_learning_models.set(
int(branch_id), num_models_already_set_this_branch
)
# Learn models
self.log_print(
[
"Branch {} has models: \nprecomputed: {} \nunlearned: {}".format(
branch_id,
self.branches[branch_id].precomputed_models,
unlearned_models_this_branch,
)
]
)
for model_name in unlearned_models_this_branch:
self.learn_model(
model_name=model_name, branch_id=branch_id, blocking=blocking
)
self.log_print(["Learning models from branch {} finished.".format(branch_id)])
[docs] def learn_model(self, model_name, branch_id, blocking=False):
r"""
Learn a given model by calling the standalone model learning functionality.
The model is learned by launching a job either locally or to the job queue.
Model learning is implemented by :func:`remote_learn_model_parameters`,
which takes a unique model name (string) and distills the terms to learn.
If running locally, QMLA core info is passed.
Else if RQ workers are being used, it retrieves QMLA info from the shared redis
database, and the function is launched via rq's `Queue.enqueue` function.
This puts a task on the redis `Queue` - the task is the implementation of
:func:`remote_learn_model_parameters`.
The effect is either to learn the model here, or else to have launched a job
where it will be learned remotely, so nothing is returned.
:param str model_name: string uniquely representing a model
:param int branch_id: unique branch ID within QMLA environment
:param bool use_rq: whether to use RQ workers, or implement locally
:param bool blocking: whether to wait on model to finish learning before proceeding.
"""
model_already_exists = self._check_model_exists(
model_name=model_name,
)
if not model_already_exists:
self.log_print(
["Model {} not yet in database: can not be learned.".format(model_name)]
)
else:
model_id = self._get_model_id_from_name(model_name=model_name)
if model_id not in self.models_learned:
self.models_learned.append(model_id)
if self.run_in_parallel and self.use_rq:
# get access to the RQ queue
queue = rq.Queue(
self.qmla_id,
connection=self.redis_conn,
is_async=self.use_rq,
default_timeout=self.rq_timeout,
)
self.log_print(
[
"Redis queue object:",
queue,
"has job waiting IDs:",
queue.job_ids,
]
)
# send model-learning, as task to job queue
queued_model = queue.enqueue(
remote_learn_model_parameters,
result_ttl=-1,
# ttl = -1,
job_timeout=self.rq_timeout,
name=model_name,
model_id=model_id,
exploration_rule=self.branches[branch_id].exploration_strategy,
branch_id=branch_id,
remote=True,
host_name=self.redis_host_name,
port_number=self.redis_port_number,
qid=self.qmla_id,
log_file=self.rq_log_file,
)
self.log_print(["Model {} on rq job {}".format(model_id, queued_model)])
if blocking:
# wait for result when called.
self.log_print(
[
"Blocking: waiting for {} to finish on redis queue".format(
model_name
)
]
)
while not queued_model.is_finished:
t_init = time.time()
some_job_failed = queued_model.is_failed
self.timings["jobs_finished"] += time.time() - t_init
self.call_counter["jobs_finished"] += 1
if some_job_failed:
self.log_print(
["Model", model_name, "has failed on remote worker."]
)
raise NameError("Remote QML failure")
break
time.sleep(self.sleep_duration)
self.log_print(["Blocking RQ - model learned:", model_name])
else:
# run model learning fnc locally
self.log_print(
[
"Locally calling learn model function.",
"model:",
model_name,
" ID:",
model_id,
]
)
# pass probes directly instead of unpickling from redis
# database
self.qmla_settings["probe_dict"] = self.probes_system
remote_learn_model_parameters(
name=model_name,
model_id=model_id,
exploration_rule=self.branches[branch_id].exploration_strategy,
branch_id=branch_id,
qmla_core_info_dict=self.qmla_settings,
remote=True,
host_name=self.redis_host_name,
port_number=self.redis_port_number,
qid=self.qmla_id,
log_file=self.rq_log_file,
)
[docs] def compare_model_pair(
self,
model_a_id,
model_b_id,
return_job=False,
branch_id=None,
remote=True,
wait_on_result=False,
):
r"""
Launch the comparison between two models.
Either locally or by passing to a job queue,
run :func:`remote_bayes_factor_calculation`
for a pair of models specified by their IDs.
:param int model_a_id: unique ID of one model of the pair
:param int model_b_id: unique ID of other model of the pair
:param bool return_job:
True - return the rq job object from this function call.
False (default) - return nothing.
:param int branch_id: unique branch ID, if this model pair
are on the same branch
:param bool remote: whether to run the job remotely or locally
True - job is placed on queue for RQ worker
False - function is computed locally immediately
:param bool wait_on_result: whether to wait for the outcome
or proceed after sending the job to the queue.
:returns bayes_factor: the Bayes factor calculated between the two models,
i.e. BF(m1,m2) where m1 is the lower model id. Only returned when
`wait_on_result==True`.
"""
unique_id = model_building_utilities.unique_model_pair_identifier(
model_a_id, model_b_id
)
if unique_id not in self.bayes_factor_pair_computed:
self.bayes_factor_pair_computed.append(unique_id)
# Launch comparison, either remotely or locally
if self.use_rq:
# launch remotely
from rq import Connection, Queue, Worker
queue = Queue(
self.qmla_id,
connection=self.redis_conn,
is_async=self.use_rq,
default_timeout=self.rq_timeout,
)
# the function object is the first argument to RQ enqueue function
job = queue.enqueue(
remote_bayes_factor_calculation,
result_ttl=-1,
# ttl = -1,
job_timeout=self.rq_timeout,
model_a_id=model_a_id,
model_b_id=model_b_id,
branch_id=branch_id,
times_record=self.bayes_factors_store_times_file,
bf_data_folder=self.instance_learning_and_comparisons_path,
# num_times_to_use=self.num_experiments_for_bayes_updates,
bayes_threshold=self.bayes_threshold_lower,
host_name=self.redis_host_name,
port_number=self.redis_port_number,
qid=self.qmla_id,
log_file=self.rq_log_file,
)
self.log_print(
[
"Bayes factor calculation queued. Models {}/{}".format(
model_a_id, model_b_id
)
]
)
if wait_on_result == True:
while not job.is_finished:
if job.is_failed:
raise ("Remote BF failure")
sleep(self.sleep_duration)
elif return_job == True:
return job
else:
# run comparison locally
remote_bayes_factor_calculation(
model_a_id=model_a_id,
model_b_id=model_b_id,
bf_data_folder=self.instance_learning_and_comparisons_path,
times_record=self.bayes_factors_store_times_file,
# num_times_to_use=self.num_experiments_for_bayes_updates,
branch_id=branch_id,
bayes_threshold=self.bayes_threshold_lower,
host_name=self.redis_host_name,
port_number=self.redis_port_number,
qid=self.qmla_id,
log_file=self.rq_log_file,
)
if wait_on_result == True:
pair_id = model_building_utilities.unique_model_pair_identifier(
model_a_id, model_b_id
)
bf_from_db = self.redis_databases["bayes_factors_db"].get(pair_id)
bayes_factor = float(bf_from_db)
return bayes_factor
[docs] def compare_model_set(
self,
model_id_list=None,
pair_list=None,
remote=True,
wait_on_result=False,
recompute=False,
):
r"""
Launch pairwise model comparison for a set of models.
If `pair_list` is specified, those pairs are compared;
otherwise all pairs within `model_id_list` are compared.
Pairs are sent to :meth:`~qmla.QuantumModelLearningAgent.compare_model_pair`
to be computed either locally or on a job queue.
:param list model_id_list: list of model names to compute comparisons between
:param list pair_list: list of tuples specifying model IDs to compare
:param bool remote:
passed directly to :meth:`~qmla.QuantumModelLearningAgent.compare_model_pair`
:param bool wait_on_results:
passed directly to :meth:`~qmla.QuantumModelLearningAgent.compare_model_pair`
:param bool recompute: whether to force comparison even if a pair has
been compared previously
"""
if pair_list is None:
pair_list = list(itertools.combinations(model_id_list, 2))
self.log_print(["compare_model_set with BF pair list:", pair_list])
remote_jobs = []
for pair in pair_list:
unique_id = model_building_utilities.unique_model_pair_identifier(
pair[0], pair[1]
)
if unique_id not in self.bayes_factor_pair_computed or recompute == True:
# ie not yet considered
remote_jobs.append(
self.compare_model_pair(
pair[0],
pair[1],
remote=remote,
return_job=wait_on_result,
)
)
if wait_on_result and self.use_rq:
self.log_print(
[
"Waiting on result of ",
"Bayes comparisons from given model list:",
model_id_list,
"\n pair list:",
pair_list,
]
)
for job in remote_jobs:
self.log_print(["Monitoring job {}".format(job)])
while not job.is_finished:
if job.is_failed:
self.log_print(["Model comparison job failed:", job])
raise NameError("Remote job failure")
time.sleep(self.sleep_duration)
else:
self.log_print(
[
"Not waiting on results of BF calculations",
"since we're not using RQ workers here.",
]
)
[docs] def compare_models_within_branch(
self, branch_id, pair_list=None, remote=True, recompute=False
):
r"""
Launch pairwise model comparison for all models on a branch.
If `pair_list` is specified, those pairs are compared;
otherwise pairs are retrieved from the `pairs_to_compare`
attribute of the branch, which is usually all-to-all.
Pairs are sent to :meth:`~qmla.QuantumModelLearningAgent.compare_model_pair`
to be computed either locally or on a job queue.
:param branch_id: unique ID of the branch within the QMLA environment
:param list pair_list: list of tuples specifying model IDs to compare
:param bool remote:
passed directly to :meth:`~qmla.QuantumModelLearningAgent.compare_model_pair`
:param bool wait_on_results:
passed directly to :meth:`~qmla.QuantumModelLearningAgent.compare_model_pair`
:param bool recompute: whether to force comparison even if a pair has
been compared previously
"""
if pair_list is None:
pair_list = self.branches[branch_id].pairs_to_compare
self.log_print(
[
"compare_models_within_branch for branch {} has {} pairs: {}".format(
branch_id, len(pair_list), pair_list
)
]
)
# Set branch as active on redis db
active_branches_bayes = self.redis_databases["active_branches_bayes"]
active_branches_bayes.set(int(branch_id), 0)
# Compare model pairs
for a, b in pair_list:
if a != b:
unique_id = model_building_utilities.unique_model_pair_identifier(a, b)
if (
unique_id not in self.bayes_factor_pair_computed
or recompute == True
):
# ie not yet considered or recomputing
self.compare_model_pair(
a,
b,
remote=remote,
branch_id=branch_id,
)
elif unique_id in self.bayes_factor_pair_computed:
# if this is already computed,
# tell this branch not to wait on it.
active_branches_bayes.incr(int(branch_id), 1)
[docs] def process_model_pair_comparison(
self,
a=None,
b=None,
pair=None,
):
r"""
Process a comparison between two models.
The comparison (Bayes factor) result is retrieved from the
redis database and used to update data on the models.
:param int a: one of the model's unique ID
:param int b: one of the model's unique ID
:param tuple pair: alternative mechanism to provide the model IDs,
effectively (a,b)
:return: ID of the model which is deemed superior
from this pair
"""
bayes_factors_db = self.redis_databases["bayes_factors_db"]
if pair is not None:
model_ids = pair.split(",")
a = float(model_ids[0])
b = float(model_ids[1])
elif a is not None and b is not None:
a = float(a)
b = float(b)
pair = model_building_utilities.unique_model_pair_identifier(a, b)
else:
self.log_print(
[
"Must pass either two model ids, or a \
pair name string, to process Bayes factors."
]
)
try:
bayes_factor = float(bayes_factors_db.get(pair))
except TypeError:
self.log_print(
[
"On bayes_factors_db for pair {} = {}".format(
pair, bayes_factors_db.get(pair)
)
]
)
# bayes_factor refers to calculation BF(pair), where pair
# is always defined (lower, higher) for consistency
lower_id = min(a, b)
higher_id = max(a, b)
self.log_print(["processing BF {}/{}".format(lower_id, higher_id)])
mod_low = self.get_model_storage_instance_by_id(lower_id)
mod_high = self.get_model_storage_instance_by_id(higher_id)
if higher_id in mod_low.model_bayes_factors:
mod_low.model_bayes_factors[higher_id].append(bayes_factor)
else:
mod_low.model_bayes_factors[higher_id] = [bayes_factor]
if lower_id in mod_high.model_bayes_factors:
mod_high.model_bayes_factors[lower_id].append((1.0 / bayes_factor))
else:
mod_high.model_bayes_factors[lower_id] = [(1.0 / bayes_factor)]
if bayes_factor > self.bayes_threshold_lower:
champ = mod_low.model_id
elif bayes_factor < (1.0 / self.bayes_threshold_lower):
champ = mod_high.model_id
else:
champ = None
self.log_print(
[
"Neither model sufficiently better to earn point between {}/{}. BF={}".format(
mod_low.model_id, mod_high.model_id, bayes_factor
)
]
)
return champ
[docs] def process_model_set_comparisons(
self,
model_list,
):
r"""
Process comparisons between a set of models.
Pairwise comparisons are retrieved and processed by
:meth:`~qmla.QuantumModelLearningAgent.process_model_pair_comparison`,
which informs the superior model.
For each pairwise comparison a given model wins, it receives a single point.
All comparisons are weighted evenly.
Model points are gathered; the model with most points is
deemed the champion of the set.
If a subset of models have the same (highest) number of points,
that subset is compared directly, with the nominated champion
deemed the champion of the wider set.
:param list model_list: list of model names to compete
:return: unique model ID of the champion model within the set
"""
# Establish pairs to check comparisons between
pair_list = list(itertools.combinations(model_list, 2))
# Process result for each pair
models_points = {mod: 0 for mod in model_list}
for pair in pair_list:
mod1, mod2 = pair
if mod1 != mod2:
res = self.process_model_pair_comparison(a=mod1, b=mod2)
if res is not None:
models_points[res] += 1
self.log_print(
[
"[process_model_set_comparisons]",
"Point to",
res,
"(comparison {}/{})".format(mod1, mod2),
]
)
# Analyse pairwise competition
self.log_print(["Models points: \n{}".format(models_points)])
max_points = max(models_points.values())
models_with_max_points = [
key for key, val in models_points.items() if val == max_points
]
if len(models_with_max_points) > 1:
self.log_print(
[
"Multiple models \
have same number of points in process_model_set_comparisons:",
models_with_max_points,
"\n Model points:\n",
models_points,
]
)
self.log_print(["After re-comparison, points:\n", models_points])
self.compare_model_set(
model_id_list=models_with_max_points,
remote=True,
recompute=True, # recompute here b/c deadlock last time
wait_on_result=True,
)
champ_id = self.process_model_set_comparisons(
models_with_max_points,
)
else:
self.log_print(["After comparing list, points:\n", models_points])
champ_id = max(models_points, key=models_points.get)
return champ_id
[docs] def process_comparisons_within_branch(self, branch_id, pair_list=None):
r"""
Process comparisons between models on the same branch.
(Similar functionality to
:meth:`~qmla.QuantumModelLearningAgent.process_model_set_comparisons`,
but additionally updates some branch infrastructure, such as updating
the branch's `champion_id`, `bayes_points` attributes).
Pairwise comparisons are retrieved and processed by
:meth:`~qmla.QuantumModelLearningAgent.process_model_pair_comparison`,
which informs the superior model.
For each pairwise comparison a given model wins, it receives a single point.
All comparisons are weighted evenly.
Model points are gathered; the model with most points is
deemed the champion of the set.
If a subset of models have the same (highest) number of points,
that subset is compared directly, with the nominated champion
deemed the champion of the wider set.
:param int branch_id: unique ID of the branch whose models to compare
:returns:
- models_points: the points (number of comparisons won)
of each model on the branch
- champ_id: unique model ID of the champion model within the set
"""
branch = self.branches[branch_id]
active_models_in_branch = branch.resident_model_ids
# Establish pairs to check comparisons between
if pair_list is None:
pair_list = branch.pairs_to_compare
self.log_print(
[
"Pair list not given for branch {}, generated:{}".format(
branch_id, pair_list
),
]
)
else:
self.log_print(["pair list given to branch processing:", pair_list])
# Process result for each pair
models_points = {k: 0 for k in active_models_in_branch}
for mod1, mod2 in pair_list:
if mod1 != mod2:
res = self.process_model_pair_comparison(a=mod1, b=mod2)
if res is not None:
try:
models_points[res] += 1
except BaseException:
models_points[res] = 1
self.log_print(
[
"[branch {} comparison {}/{}] ".format(branch_id, mod1, mod2),
"Point to",
res,
]
)
self.log_print(["Comparisons complete on branch {}".format(branch_id)])
# Update branch with these results to determine branch champion
branch.update_branch(pair_list=pair_list, models_points=models_points)
# If the given results are not sufficient for the ES to determine a branch champion,
# reconsider a subset of models
while not branch.is_branch_champion_set:
reduced_model_set = branch.joint_branch_champions
self.log_print(
["Branch champion not determined.", "Reconsidering:", reduced_model_set]
)
self.compare_model_set(
model_id_list=reduced_model_set,
remote=True,
recompute=False,
wait_on_result=True,
)
# Pass result of compare_model_set to branch to decide if sufficient to choose champion
models_to_recompare = list(itertools.combinations(reduced_model_set, 2))
self.process_comparisons_within_branch(
branch_id=branch_id, pair_list=models_to_recompare
)
return branch.champion_id
##########
# Section: routines to implement tree-based QMLA
##########
[docs] def learn_models_until_trees_complete(
self,
):
r"""
Iteratively learn/compare/generate models on exploration strategy trees.
Each :class:`~qmla.exploration_strategies.ExplorationStrategy` has a unique :class:`~qmla.QMLATree``.
Trees hold sets of models on :class:`~qmla.BranchTree` objects.
Models on a each branch are learned through :meth:`learn_models_on_given_branch`.
Any model which has previously been considered defaults to the earlier
instance of that model, rather than repeating the calculation.
When all models on a branch are learned, they are all compared
through :meth:`compare_models_within_branch`.
When a branch has completed learning and comparisons of models,
the corresponding tree is checked to see if it has finished proposing
models, through :meth:`~qmla.ExplorationTree.is_tree_complete`.
If the tree is not complete, the :meth:`~qmla.ExplorationTree.next_layer`
method is called to generate the next branch on that tree.
The next branch can correspond to `spawn` or `prune` stages of the
tree's :class:`~qmla.exploration_strategies.ExplorationStrategy`, but QMLA is ambivalent to the
inner workings of the tree/exploration strategy: a branch is
simply a set of models to learn and compare.
When all trees have completed learning, this method terminates.
"""
# Get redis databases
active_branches_learning_models = self.redis_databases[
"active_branches_learning_models"
]
active_branches_bayes = self.redis_databases["active_branches_bayes"]
# Launch learning on initial branches
for b in self.branches:
self.learn_models_on_given_branch(
b,
blocking=False,
)
self.log_print(
["Starting learning for initial branches:", list(self.branches.keys())]
)
# Iteratively learn/compare/spawn until all trees declare completion
self.log_print(["Entering while loop: learning/comparing/spawning models."])
ctr = 0
while self.tree_count_completed < self.tree_count:
# get most recent branches on redis database
branch_ids_on_db = list(active_branches_learning_models.keys())
branch_ids_on_db = [int(b) for b in branch_ids_on_db]
# check if any job has crashed
if self.run_in_parallel:
sleep(self.sleep_duration)
self._inspect_remote_job_crashes()
# loop through active branches
for branch_id in branch_ids_on_db:
# inspect if branch has finished learning
num_models_learned_on_branch = int(
active_branches_learning_models.get(branch_id)
)
if (
not self.branches[branch_id].model_learning_complete
and num_models_learned_on_branch
== self.branches[branch_id].num_models
):
self.log_print(
["All models on branch {} learned".format(branch_id)]
)
self.branches[branch_id].model_learning_complete = True
for mod_id in self.branches[branch_id].resident_model_ids:
mod = self.get_model_storage_instance_by_id(mod_id)
mod.model_update_learned_values()
# launch comparisons
self.compare_models_within_branch(branch_id)
elif ctr % 100 == 0:
self.log_print(
[
"Ctr {} branch {} has {} of {} models learned; model_learning_complete: {}".format(
ctr,
branch_id,
int(num_models_learned_on_branch),
self.branches[branch_id].num_models,
self.branches[branch_id].model_learning_complete,
)
]
)
for branchID_bytes in active_branches_bayes.keys():
branch_id = int(branchID_bytes)
num_comparisons_complete_on_branch = active_branches_bayes.get(
branchID_bytes
)
if not self.branches[branch_id].comparisons_complete and (
int(num_comparisons_complete_on_branch)
== self.branches[branch_id].num_model_pairs
):
self.branches[branch_id].comparisons_complete = True
# analyse resulting bayes factors
self.log_print(["Branch {} comparisons starting".format(branch_id)])
self.process_comparisons_within_branch(branch_id)
self.log_print(["Branch {} comparisons complete".format(branch_id)])
# check if tree is complete
if self.branches[branch_id].tree.is_tree_complete():
self.tree_count_completed += 1
self.log_print(
[
"Tree complete:",
self.branches[branch_id].exploration_strategy,
"Number of trees now completed:",
self.tree_count_completed,
]
)
else:
# tree not complete -> launch next set of models
self.spawn_from_branch(
branch_id=branch_id,
)
elif ctr % 100 == 0:
self.log_print(
[
"Ctr {} branch {} has {} out of {} comparisons complete; comparisons_complete: {}".format(
ctr,
branch_id,
int(num_comparisons_complete_on_branch),
self.branches[branch_id].num_model_pairs,
self.branches[branch_id].comparisons_complete,
)
]
)
ctr += 1
self.log_print(
[
"{} trees have completed. Waiting on final comparisons".format(
self.tree_count_completed
)
]
)
# Allow any branches which have just started to finish
still_learning = True
while still_learning:
branch_ids_on_db = list(active_branches_learning_models.keys())
for branchID_bytes in branch_ids_on_db:
branch_id = int(branchID_bytes)
if (
int(active_branches_learning_models.get(branch_id))
== self.branches[branch_id].num_models
) and self.branches[branch_id].model_learning_complete == False:
self.branches[branch_id].model_learning_complete = True
self.compare_models_within_branch(branch_id)
for mod_id in self.branches[branch_id].resident_model_ids:
mod = self.get_model_storage_instance_by_id(mod_id)
mod.model_update_learned_values()
if branchID_bytes in active_branches_bayes:
num_comparisons_complete_on_branch = active_branches_bayes.get(
branchID_bytes
)
if (
int(num_comparisons_complete_on_branch)
== self.branches[branch_id].num_model_pairs
) and (self.branches[branch_id].comparisons_complete == False):
self.branches[branch_id].comparisons_complete = True
self.process_comparisons_within_branch(branch_id)
if np.all(
np.array(
[self.branches[b].model_learning_complete for b in self.branches]
)
) and np.all(
np.array([self.branches[b].comparisons_complete for b in self.branches])
):
# break out of this while loop
still_learning = False
# Finalise all trees.
for tree in self.trees.values():
tree.finalise_tree(
model_names_ids=self.model_name_id_map,
)
self.log_print(["Learning stage complete on all trees."])
[docs] def spawn_from_branch(
self,
branch_id,
):
r"""
Retrieve the next set of models and place on a new branch.
By checking the :class:`~qmla.tree.QMLATree`` associated with the `branch_id` used
to call this method, call :meth:`ExplorationTree.next_layer`, which returns
a set of models to place on a new branch, as well as which models therein
to compare. These are passed to :meth:`new_branch`, constructing a new branch
in the QMLA environment. The generated new branch then has all its models
learned by calling :meth:`~qmla.QuantumModelLearningAgent.learn_models_on_given_branch`.
:meth:`~qmla.ExplorationTree.next_layer` is in control of how to select the next set of models,
usually either by calling the :class:`~qmla.exploration_strategies.ExplorationStrategy`'s
:meth:`~qmla.exploration_strategies.ExplorationStrategy.generate_models` or
:meth:`~qmla.exploration_strategies.ExplorationStrategy.tree_pruning` methods.
This allows the user to define how models are generated,
given access to the comparisons of the previous branch,
or how the tree is pruned, e.g. by performing preliminary
parent/child branch champion comparisons.
:param int branch_id: unique ID of the branch which has completed
"""
model_list = self.branches[branch_id].ranked_models
model_names = [self.model_name_id_map[mod_id] for mod_id in model_list]
new_models, models_to_compare = self.branches[branch_id].tree.next_layer(
model_list=model_names,
# can model_list be functionally replaced by info in branch_model_points?
model_names_ids=self.model_name_id_map,
called_by_branch=branch_id,
branch_model_points=self.branches[branch_id].bayes_points,
evaluation_log_likelihoods=self.branches[
branch_id
].evaluation_log_likelihoods,
model_dict=self.model_lists, # only used by FullAccessNVCentre TODO remove properly and don't pass
)
self.log_print(
[
"After model generation for ES",
self.branches[branch_id].exploration_strategy,
"\nnew models:",
new_models,
]
)
# Generate new QMLA level branch
new_branch_id = self.new_branch(
model_list=new_models,
pairs_to_compare_by_names=models_to_compare,
exploration_strategy=self.branches[branch_id].exploration_strategy,
spawning_branch=branch_id,
)
# Learn models on the new branch
self.learn_models_on_given_branch(
new_branch_id,
blocking=False,
)
[docs] def new_branch(
self,
model_list,
pairs_to_compare="all",
pairs_to_compare_by_names=None,
exploration_strategy=None,
spawning_branch=0,
):
r"""
Add a set of models to a new QMLA branch.
Branches have a unique id within QMLA, but belong to a single
tree, where each tree corresponds to a single exploration strategy.
:param list model_list: strings corresponding to models to
place in the branch
:param pairs_to_compare: set of model pairs to perform comparisons between.
'all' (deafult) means all models in `model_list` are set to compare.
Otherwise a list of tuples of model IDs to compare
:type pairs_to_compare: str or list
:param str exploration_strategy: exploration strategy identifer;
used to get the unique tree object corresponding to an exploration strategy,
which is then used to host the branch.
:param int spawning_branch: branch id which is the parent of the new branch.
:return: branch id which uniquely identifies the new branch
within the QMLA environment.
"""
model_list = list(set(model_list)) # remove possible duplicates
branch_id = int(self.branch_highest_id) + 1
self.branch_highest_id = branch_id
if exploration_strategy is None:
exploration_strategy = self.exploration_strategy_of_true_model
exploration_tree = self.trees[exploration_strategy]
this_branch_models = {}
model_id_list = []
pre_computed_models = []
for model in model_list:
# add_model_to_database returns whether adding model was successful
# if false, that's because it's already been computed
add_model_info = self.add_model_to_database(
model,
branch_id=branch_id,
exploration_tree=exploration_tree,
)
already_computed = not (add_model_info["is_new_model"])
model_id = add_model_info["model_id"]
this_branch_models[model_id] = model
model_id_list.append(model_id)
# register if new model
if already_computed:
pre_computed_models.append(model)
self.log_print(
[
"Model {} computed already: {} -> ID {}".format(
model,
already_computed,
model_id,
),
]
)
model_storage_instances = {
m: self.get_model_storage_instance_by_id(m)
for m in list(this_branch_models.keys())
}
# Start new branch on corresponding exploration strategy tree
if pairs_to_compare_by_names is not None:
if pairs_to_compare_by_names == "all":
pairs_to_compare = "all"
else:
self.log_print(["Getting model IDs to set comparison subset"])
try:
pairs_to_compare = [
(
self.model_database[
self.model_database.model_name == m1
].model_id.item(),
self.model_database[
self.model_database.model_name == m2
].model_id.item(),
)
for m1, m2 in pairs_to_compare_by_names
]
self.log_print(["IDs:", pairs_to_compare])
except BaseException:
self.log_print(
[
"Failed to unpack pairs_to_compare_by_names:\n",
pairs_to_compare_by_names,
]
)
raise
self.branches[branch_id] = exploration_tree.new_branch_on_tree(
branch_id=branch_id,
models=this_branch_models,
pairs_to_compare=pairs_to_compare,
model_storage_instances=model_storage_instances,
precomputed_models=pre_computed_models,
spawning_branch=spawning_branch,
)
return branch_id
[docs] def add_model_to_database(
self, model, exploration_tree, branch_id=-1, force_create_model=False
):
r"""
Considers adding a model to QMLA's database of models.
Checks whether the nominated model is already present;
if not generates a model instance and
stores pertinent details in the model database.
:param str model: name of model to consider
:param float branch_id: branch id to associate this model with,
if the model is new.
:param bool force_create_model:
True: add model even if the name is found already.
False: (default) check if the model exists before adding
:return dict add_model_output:
`is_new_model` : bool, whether model is new (True) or has already been added (False)
model_id: unique model ID for the model, whether new or existing
"""
model_name = model_building_utilities.alph(model)
self.log_print(
["Trying to add model to DB:", model_name, " with ET ", exploration_tree]
)
# Add model if not yet considered or told to force create
if self._consider_new_model(model_name) == "New" or force_create_model == True:
# create new model instance
model_num_qubits = qmla.model_building_utilities.get_num_qubits(model_name)
model_id = self.highest_model_id + 1
self.model_lists[model_num_qubits].append(model_name)
self.log_print(
[
"Model {} not previously considered -- adding with ID {}".format(
model_name, model_id
)
]
)
# Generate model storage instance
model_constructor = exploration_tree.exploration_class.model_constructor(
name=model_name
)
model_storage_instance = qmla.model_for_storage.ModelInstanceForStorage(
model_name=model_name,
model_id=int(model_id),
true_oplist=self.true_model_constituent_operators,
true_model_terms_params=self.true_param_list,
qid=self.qmla_id,
qmla_core_info_database=self.qmla_core_info_database,
plot_probes=self.probes_for_plots,
host_name=self.redis_host_name,
port_number=self.redis_port_number,
log_file=self.log_file,
)
# Add to the model database
f_score = np.round(
self.compute_model_f_score(
model_id=model_id,
model_name=model_name,
model_constructor=model_constructor,
exploration_class=exploration_tree.exploration_class,
),
2,
)
terms = qmla.model_building_utilities.get_constituent_names_from_name(
model_name
)
running_db_new_row = pd.Series(
{
"model_id": int(model_id),
"model_name": model_name,
"latex_name": model_constructor.name_latex,
"branch_id": int(branch_id),
"f_score": f_score,
"model_storage_instance": model_storage_instance,
"branches_present_on": [int(branch_id)],
"model_constructor": model_constructor,
"terms": terms,
"latex_terms": model_constructor.terms_names_latex,
}
)
num_rows = len(self.model_database)
self.model_database.loc[num_rows] = running_db_new_row
model_added = True
if model_building_utilities.alph(model) == model_building_utilities.alph(
self.true_model_name
):
self.true_model_id = model_id
self.true_model_considered = True
self.true_model_branch = branch_id
self.true_model_on_branhces = [branch_id]
self.log_print(["True model has ID", model_id])
self.highest_model_id = model_id
self.model_name_id_map[model_id] = model_name
self.model_count += 1
self.models_branches[model_id] = int(branch_id)
else:
# do not create new model instance
model_added = False
self.log_print(["Model not added: {}".format(model_name)])
try:
model_id = self._get_model_id_from_name(model_name=model_name)
self.log_print(["Previously considered as model ID ", model_id])
self.model_database[
self.model_database.model_id == model_id
].branches_present_on.item().append(int(branch_id))
if model_id == self.true_model_id:
self.true_model_on_branhces.append(model_id)
except BaseException:
self.log_print(
[
"Couldn't find model id for model:",
model_name,
"model_names_ids:",
self.model_name_id_map,
]
)
raise
add_model_output = {
"is_new_model": model_added,
"model_id": model_id,
}
return add_model_output
def finalise_instance(self):
self.compute_statistical_metrics_by_generation()
self.exploration_class.exploration_strategy_finalise()
if self.qhl_mode_multiple_models:
self.log_print(["No special analysis for this mode"])
elif self.qhl_mode:
self.log_print(["No special analysis for this mode"])
else:
self.finalise_qmla()
[docs] def finalise_qmla(self):
r"""
Steps to end QMLA algorithm, such as storing analytics.
"""
champ_model = self.get_model_storage_instance_by_id(self.champion_model_id)
# compute full dynamics for branch champions
champ_model.compute_expectation_values(
times=self.times_to_plot,
)
self.branch_champions = [self.branches[b].champion_id for b in self.branches]
self.log_print(["Branch champions:", self.branch_champions])
for bc in self.branch_champions:
bc_mod = self.get_model_storage_instance_by_id(bc)
bc_mod.compute_expectation_values(times=self.times_to_plot)
# Get metrics for all models tested
for i in self.models_learned:
# dict of all Bayes factors for each model considered.
self.all_bayes_factors[i] = self.get_model_storage_instance_by_id(
i
).model_bayes_factors
self.bayes_factors_data()
# Prepare model/name maps
self.model_id_to_name_map = {}
for k in self.model_name_id_map:
v = self.model_name_id_map[k]
self.model_id_to_name_map[v] = k
# Store model IDs and names
model_data = self.model_database[
# subset of columns to store
[
"model_id",
"model_name",
"latex_name",
"branch_id",
"f_score",
] # TODO add log_likelihood here
]
model_data.to_csv(
os.path.join(self.qmla_controls.plots_directory, "model_directory.csv")
)
def bayes_factors_data(self):
self.bayes_factors_df = pd.DataFrame(
columns=[
"model_a",
"id_a",
"f_score_a",
"model_b",
"id_b",
"f_score_b",
"bayes_factor",
"log10_bayes_factor",
]
)
for m in self.models_learned:
mod = self.get_model_storage_instance_by_id(m)
mod_name_a = mod.model_name
mod_id_a = int(mod.model_id)
f_score_a = qmla.utilities.round_nearest(
self.model_f_scores[mod_id_a], 0.05
)
bayes_factors = mod.model_bayes_factors
for b in bayes_factors:
mod_name_b = self.model_name_id_map[b]
mod_id_b = int(b)
f_score_b = qmla.utilities.round_nearest(
self.model_f_scores[mod_id_b], 0.05
)
for bf in bayes_factors[b]:
d = pd.Series(
{
"model_a": mod_name_a,
"id_a": mod_id_a,
"f_score_a": f_score_a,
"model_b": mod_name_b,
"id_b": mod_id_b,
"f_score_b": f_score_b,
"bayes_factor": bf,
"log10_bayes_factor": np.round(np.log10(bf), 1),
}
)
new_idx = len(self.bayes_factors_df)
self.bayes_factors_df.loc[new_idx] = d
[docs] def get_results_dict(self, model_id=None):
r"""
Store the useful information of a given model, usually the champion.
:param int model_id: unique ID of the model whose information to store
:return dict results_dict: data which will be stored in the results_{ID}.p
file following QMLA's completion.
"""
if model_id is None:
if self.champion_model_id != -1:
model_id = self.champion_model_id
elif self.true_model_id != -1:
model_id = self.true_model_id
else:
model_id = 1
self.log_print(
["No model id passed to get_results_dict; defaulting to 1"]
)
try:
mod = self.get_model_storage_instance_by_id(model_id)
except:
self.log_print(
["Could not get model storage instance for model {}".format(model_id)]
)
model_name = mod.model_name
# Get expectation values of this model
n_qubits = model_building_utilities.get_num_qubits(model_name)
if n_qubits > 5:
expec_val_plot_times = self.times_to_plot_reduced_set
else:
expec_val_plot_times = self.times_to_plot
mod.compute_expectation_values(
times=expec_val_plot_times,
)
# Evaluations of all models in this instance
model_evaluation_log_likelihoods = {
mod_id: self.get_model_storage_instance_by_id(
mod_id
).evaluation_log_likelihood
for mod_id in self.models_learned
}
model_evaluation_median_likelihoods = {
mod_id: self.get_model_storage_instance_by_id(
mod_id
).evaluation_median_likelihood
for mod_id in self.models_learned
}
# Compare this model to the true model (only meaningful for simulated
# cases)
correct_model = misfit = underfit = overfit = 0
num_params_champ_model = mod.model_constructor.num_terms
if model_name == self.true_model_name:
correct_model = 1
elif (
num_params_champ_model == self.true_model_num_params
and model_name != self.true_model_name
):
misfit = 1
elif num_params_champ_model > self.true_model_num_params:
overfit = 1
elif num_params_champ_model < self.true_model_num_params:
underfit = 1
num_params_difference = self.true_model_num_params - num_params_champ_model
true_model_family_found = (
self.exploration_strategy_of_true_model
== mod.exploration_strategy_of_this_model
)
# Summarise the results of this model and instance in a dictionary
# Note this is used to feed offline analysis including outdated methods
# new analysis should use the pandas databases within instances and combined
# at the run level.
time_taken = time.time() - self._start_time
results_dict = {
# Details about QMLA instance:
"QID": self.qmla_id,
"NumParticles": self.num_particles,
"NumExperiments": mod.num_experiments,
"ConfigLatex": self.latex_config,
"Heuristic": mod.model_heuristic_class,
"Time": time_taken,
"Host": self.redis_host_name,
"Port": self.redis_port_number,
"ResampleThreshold": self.exploration_class.qinfer_resampler_threshold,
"ResamplerA": self.exploration_class.qinfer_resampler_a,
# Details about true model:
"TrueModel": self.true_model_name,
"TrueModelConsidered": self.true_model_considered,
"TrueModelFound": self.true_model_found,
"TrueModelBranch": self.true_model_branch,
"Truemodel_id": self.true_model_id,
"TrueModelConstituentTerms": self.true_model_constituent_terms_latex,
"TrueExplorationStrategy": self.exploration_strategy_of_true_model,
# Details about this model
"ChampID": model_id,
"ChampLatex": mod.model_name_latex,
"ConstituentTerms": mod.constituents_terms_latex,
"LearnedHamiltonian": mod.learned_hamiltonian,
"ExplorationRule": mod.exploration_strategy_of_this_model,
"NameAlphabetical": model_building_utilities.alph(mod.model_name),
"LearnedParameters": mod.qhl_final_param_estimates,
"FinalSigmas": mod.qhl_final_param_uncertainties,
"ExpectationValues": mod.expectation_values,
"Trackplot_parameter_estimates": mod.track_parameter_estimates,
"TrackVolume": mod.volume_by_epoch,
"TrackTimesLearned": mod.times_learned_over,
"QuadraticLosses": mod.quadratic_losses_record,
"FinalRSquared": mod.r_squared(
times=expec_val_plot_times,
),
"Fscore": self.model_f_scores[model_id],
"Precision": self.model_precisions[model_id],
"Sensitivity": self.model_sensitivities[model_id],
"PValue": mod.p_value,
# Comparison to true model (for simulated cases)
"NumParamDifference": num_params_difference,
"Underfit": underfit,
"Overfit": overfit,
"Misfit": misfit,
"CorrectModel": correct_model,
"TrueFamilyFound": true_model_family_found,
# About QMLA's learning procedure:
"NumModels": len(self.models_learned),
"StatisticalMetrics": self.generational_statistical_metrics,
"GenerationalFscore": self.generational_f_score,
"GenerationalLogLikelihoods": self.generational_log_likelihoods,
"ModelEvaluationLogLikelihoods": model_evaluation_log_likelihoods,
"ModelEvaluationMedianLikelihoods": model_evaluation_median_likelihoods,
"AllModelFScores": self.model_f_scores,
}
self.storage = qmla.utilities.StorageUnit()
self.storage.qmla_id = self.qmla_id
self.storage.bayes_factors_df = self.bayes_factors_df
self.storage.model_f_scores = self.model_f_scores
self.storage.exploration_strategy_storage = self.exploration_class.storage
# store expectation values of all models
df_cols = ["time", "exp_val", "model_id", "qmla_id"]
expectation_values_df = pd.DataFrame(columns=df_cols)
for m in self.models_learned:
mod = self.get_model_storage_instance_by_id(m)
times = list(sorted(mod.expectation_values.keys()))
ev = [mod.expectation_values[t] for t in times]
d = pd.DataFrame(
columns=df_cols,
)
d["time"] = times
d["exp_val"] = ev
d["model_id"] = m
d["qmla_id"] = self.qmla_id
expectation_values_df = expectation_values_df.append(d)
self.storage.expectation_values = expectation_values_df
try:
# TODO this fails for QHL mode since champion not assigned -- fix
self.storage.branch_champions = {
b: self.branches[b].champion_id for b in self.branches
}
except:
pass
models_generated = self.model_database[
["model_name", "model_id", "latex_name", "f_score", "terms"]
]
models_generated["champion"] = False
models_generated.loc[
(models_generated.model_id == self.champion_model_id), "champion"
] = True
self.storage.models_generated = models_generated
for r in results_dict:
# TODO: get rid of results_dict; use storage class instead to achieve the same things
self.storage.__setattr__(r, results_dict[r])
return results_dict
[docs] def check_champion_reducibility(
self,
):
r"""
Potentially remove negligible terms from the champion model.
Consider whether the champion model has some terms whose parameters
were found to be negligible (either within one standard
deviation from 0, or very close to zero as determined by the exploration strategy's
`learned_param_limit_for_negligibility` attribute).
Construct a new model which is the same as the champion, less those negligible
terms, named the reduced champion. The data of the champion model is inherited
by the reduced candidate model, i.e. its parameter estimates, as well as
its history of parameter learning for those which are not negligible.
A new `normalization_record` is started, which is used in the comparison between
the champion and the reduced champion.
Compare the champion with the reduced champion; if the reduced champion
is heavily favoured, directly select it as the global champion.
This method is triggered if the exploration strategy's `check_champion_reducibility`
attribute is set to True.
"""
import qinfer
champ_mod = self.get_model_storage_instance_by_id(self.global_champion_id)
self.log_print(
[
"Checking reducibility of champ model:",
self.global_champion_name,
"\nParams:\n",
champ_mod.qhl_final_param_estimates,
"\nSigmas:\n",
champ_mod.qhl_final_param_uncertainties,
]
)
params = list(champ_mod.qhl_final_param_estimates.keys())
to_remove = []
removed_params = {}
idx = 0
for p in params:
# if champ_mod.qhl_final_param_uncertainties[p] > champ_mod.qhl_final_param_estimates[p]:
# to_remove.append(p)
# removed_params[p] = np.round(
# champ_mod.qhl_final_param_estimates[p],
# 2
# )
if (
np.abs(champ_mod.qhl_final_param_estimates[p])
< self.exploration_class.learned_param_limit_for_negligibility
):
to_remove.append(p)
removed_params[p] = np.round(champ_mod.qhl_final_param_estimates[p], 2)
if len(to_remove) >= len(params):
self.log_print(
[
"Attempted champion reduction failed due to",
"all parameters found as neglibible.",
"Check method of determining negligibility.",
"(By default, parameter removed if sigma of that",
"parameters final posterior > parameter.",
"i.e. 0 within 1 sigma of distriubtion",
]
)
return
if len(to_remove) > 0:
new_model_terms = list(set(params) - set(to_remove))
new_mod = "+".join(new_model_terms)
new_mod = model_building_utilities.alph(new_mod)
self.log_print(
[
"Some neglibible parameters found:",
removed_params,
"\nReduced champion model suggested:",
new_mod,
]
)
reduced_mod_info = self.add_model_to_database(
model=new_mod, force_create_model=True
)
reduced_mod_id = reduced_mod_info["model_id"]
reduced_mod_instance = self.get_model_storage_instance_by_id(reduced_mod_id)
reduced_mod_terms = sorted(
model_building_utilities.get_constituent_names_from_name(new_mod)
)
# get champion leared info
reduced_champion_info = pickle.loads(
self.redis_databases["learned_models_info_db"].get(
str(self.champion_model_id)
)
)
reduced_params = {}
reduced_sigmas = {}
for term in reduced_mod_terms:
reduced_params[term] = champ_mod.qhl_final_param_estimates[term]
reduced_sigmas[term] = champ_mod.qhl_final_param_uncertainties[term]
learned_params = [reduced_params[t] for t in reduced_mod_terms]
sigmas = np.array([reduced_sigmas[t] for t in reduced_mod_terms])
final_params = np.array(list(zip(learned_params, sigmas)))
new_cov_mat = np.diag(sigmas ** 2)
new_prior = qinfer.MultivariateNormalDistribution(
learned_params, new_cov_mat
)
# reduce learned info where appropriate
reduced_champion_info["name"] = new_mod
reduced_champion_info["model_terms_names"] = reduced_mod_terms
reduced_champion_info["final_cov_mat"] = new_cov_mat
reduced_champion_info["final_params"] = final_params
reduced_champion_info["learned_parameters"] = reduced_params
reduced_champion_info["model_id"] = reduced_mod_id
reduced_champion_info["final_prior"] = new_prior
reduced_champion_info["est_mean"] = np.array(learned_params)
reduced_champion_info["final_sigmas"] = reduced_sigmas
reduced_champion_info["initial_params"] = reduced_sigmas
# do not inherit normalization_record and times from original
# champion
reduced_champion_info["normalization_record"] = []
reduced_champion_info["times"] = []
compressed_reduced_champ_info = pickle.dumps(
reduced_champion_info, protocol=4
)
# TODO generate new model for champion
# - scratch normalization record;
# - learn according to MPGH for both champion
# and suggested reduced champion,
# then take BF based on that
self.redis_databases["learned_models_info_db"].set(
str(float(reduced_mod_id)), compressed_reduced_champ_info
)
self.get_model_storage_instance_by_id(
reduced_mod_id
).model_update_learned_values()
bayes_factor = self.compare_model_pair(
model_a_id=int(self.champion_model_id),
model_b_id=int(reduced_mod_id),
wait_on_result=True,
)
self.log_print(["BF b/w champ and reduced champ models:", bayes_factor])
if bayes_factor < (
1.0 / self.exploration_class.reduce_champ_bayes_factor_threshold
):
# overwrite champ id etc
self.log_print(
[
"Replacing champion model ({}) with reduced champion model ({} - {})".format(
self.champion_model_id, reduced_mod_id, new_mod
),
"\n i.e. removing negligible parameter terms:\n{}".format(
removed_params
),
]
)
original_champ_id = self.champion_model_id
self.champion_model_id = reduced_mod_id
self.global_champion = new_mod
# inherits BF of champion from which it derived (only really
# used for plotting)
new_champ = self.get_model_storage_instance_by_id(
self.champion_model_id
)
new_champ.model_bayes_factors = self.get_model_storage_instance_by_id(
original_champ_id
).model_bayes_factors
new_champ.times_learned_over = champ_mod.times_learned_over
self.models_learned.append(reduced_mod_id)
else:
self.log_print(["Parameters non-negligible; not replacing champion model."])
[docs] def compare_nominated_champions(self):
r"""
Compare the champions of all exploration strategy trees.
Get the champions (usually one, but in general can be multiple)
from each tree, where each tree is unique to an exploration strategy.
Place the champions on a branch together and perform all-versus-all
comparisons. The champion of that branch is deemed the global champion.
"""
tree_champions = []
for tree in self.trees.values():
# extend in case multiple models nominated by tree
tree_champions.extend(tree.nominate_champions())
# Place tree champions on new QMLA branch, not tied to an exploration strategy
global_champ_branch_id = self.new_branch(model_list=tree_champions)
global_champ_branch = self.branches[global_champ_branch_id]
# Compare models (using this fnc so we can wait_on_result)
# self.compare_model_set(
# pair_list=global_champ_branch.pairs_to_compare,
# wait_on_result=True,
# )
self.compare_models_within_branch(
branch_id=global_champ_branch_id,
pair_list=global_champ_branch.pairs_to_compare,
)
# TODO wait until all BF computed on final branch
active_branches_bayes = self.redis_databases["active_branches_bayes"]
num_comparisons_complete_on_branch = active_branches_bayes.get(
int(global_champ_branch_id)
)
self.log_print(
[
"Starting to wait on comparisons between branch champions.",
"Initially completed:",
num_comparisons_complete_on_branch,
"num pairs on branch:",
global_champ_branch.num_model_pairs,
]
)
while not global_champ_branch.comparisons_complete:
num_comparisons_complete_on_branch = int(
active_branches_bayes.get(int(global_champ_branch_id))
)
if (
num_comparisons_complete_on_branch
== global_champ_branch.num_model_pairs
):
global_champ_branch.comparisons_complete = True
self.log_print(["Comparisons between branch champions complete."])
champ_id = self.process_comparisons_within_branch(
branch_id=global_champ_branch_id
)
# Assign champion of set to be global champion
self.global_champion_id = champ_id
self.global_champion_model = self.get_model_storage_instance_by_id(
self.global_champion_id
)
self.global_champion_name = self.global_champion_model.model_name
self.log_print(
[
"Global champion branch points:",
global_champ_branch.bayes_points,
"\nGlobal champion ID:",
champ_id,
"\nGlobal champion:",
self.global_champion_name,
]
)
##########
# Section: Run available algorithms (QMLA, QHL or QHL with multiple models)
##########
[docs] def run_quantum_hamiltonian_learning(
self,
):
r"""
Run Quantum Hamiltonian Learning algorithm .
The `true_model` of the :class:`~qmla.exploration_strategies.ExplorationStrategy` is used to generate
true data (in simulation) and have its parameters learned.
"""
qhl_branch = self.new_branch(
exploration_strategy=self.exploration_strategy_of_true_model,
model_list=[self.true_model_name],
)
mod_to_learn = self.true_model_name
self.log_print(
[
"QHL for true model:",
mod_to_learn,
]
)
self.learn_model(model_name=mod_to_learn, branch_id=qhl_branch, blocking=True)
mod_id = self._get_model_id_from_name(model_name=mod_to_learn)
# These don't really matter for QHL,
# but are used in plots etc:
self.true_model_id = mod_id
self.champion_model_id = mod_id
self.true_model_found = True
self.true_model_considered = True
self.log_print(["Learned model {}: {}".format(mod_id, mod_to_learn)])
self._update_database_model_info()
self.exploration_class.exploration_strategy_finalise()
self.finalise_instance()
# self._plot_statistical_metrics()
[docs] def run_quantum_hamiltonian_learning_multiple_models(self, model_names=None):
r"""
Run Quantum Hamiltonian Learning algorithm with multiple simulated models.
Numerous Hamiltonian models attempt to learn the dynamics of the true model.
The underlying model is set in the :class:`~qmla.exploration_strategies.ExplorationStrategy`'s `true_model` attribute.
:param list model_names:
list of strings of model names to learn the parameterisations of.
None: taken from :class:`~qmla.exploration_strategies.ExplorationStrategy` `qhl_models`.
"""
# Choose models to perform QHL on
if model_names is None:
model_names = self.exploration_class.qhl_models
# Place models on a branch
branch_id = self.new_branch(
exploration_strategy=self.exploration_strategy_of_true_model,
model_list=model_names,
)
self.qhl_mode_multiple_models = True
self.champion_model_id = -1 # TODO just so not to crash during dynamics plot
self.qhl_mode_multiple_models_model_ids = [
self._get_model_id_from_name(model_name=mod_name)
for mod_name in model_names
]
self.log_print(
[
"QHL for multiple models:",
model_names,
]
)
learned_models_ids = self.redis_databases["learned_models_ids"]
# learn models
for mod_name in model_names:
mod_id = self._get_model_id_from_name(model_name=mod_name)
learned_models_ids.set(str(mod_id), 0)
self.learn_model(model_name=mod_name, branch_id=branch_id, blocking=False)
running_models = learned_models_ids.keys()
self.log_print(
[
"Running Models:",
running_models,
]
)
for k in running_models:
# waiting on all models to finish,
while int(learned_models_ids.get(k)) != 1:
sleep(self.sleep_duration)
self._inspect_remote_job_crashes()
# Learning finished
self.log_print(
[
"Finished learning for all:",
running_models,
]
)
# Tidy up: store learned info, analyse, etc.
for mod_name in model_names:
mod_id = self._get_model_id_from_name(model_name=mod_name)
mod = self.get_model_storage_instance_by_id(mod_id)
mod.model_update_learned_values()
self.exploration_class.exploration_strategy_finalise()
self.model_id_to_name_map = {}
for k in self.model_name_id_map:
v = self.model_name_id_map[k]
self.model_id_to_name_map[v] = k
for k in self.timings:
self.log_print(
["QMLA Timing - {}: {}".format(k, np.round(self.timings[k], 2))]
)
self.finalise_instance()
[docs] def run_complete_qmla(
self,
):
r"""
Run complete Quantum Model Learning Agent algorithm.
Each :class:`~qmla.exploration_strategies.ExplorationStrategy` is assigned a :class:`~qmla.tree.QMLATree`,
which manages the exploration strategy. When new models are spawned by an exploration strategy,
they are placed on a :class:`~qmla.tree.BranchQMLA` of the corresponding tree.
Models are learned/compared/spawned iteratively in
:meth:`learn_models_until_trees_complete`, until all
trees declare that their exploration strategy has completed.
Exploration Strategies are complete when they have nominated one or more champions,
which can follow spawning/pruning stages as required by the exploration strategy.
Nominated champions are then compared with :meth:`compare_nominated_champions`,
resulting in a single global champion selected.
Some analysis then takes place, including possibly reducing the
selected global champion if it is found that some of its terms are not impactful.
"""
# Set up one tree per exploration strategy
for tree in list(self.trees.values()):
starting_models, models_to_compare = tree.get_initial_models()
# TODO genetic alg giving some non-unique initial model sets
self.log_print(
[
"First branch for {} has ( {}/{} unique ) starting models: {}".format(
tree.exploration_strategy,
len(set(starting_models)),
len(starting_models),
starting_models,
),
# "models_to_compare:", models_to_compare
]
)
self.new_branch(
model_list=starting_models,
exploration_strategy=tree.exploration_strategy,
pairs_to_compare_by_names=models_to_compare,
)
# Iteratively learn models, compute bayes factors, spawn new models
self.learn_models_until_trees_complete()
self.log_print(["Exploration Strategy trees completed."])
# Choose champion by comparing nominated champions of all trees.
self.compare_nominated_champions()
self.champion_model_id = self._get_model_data_by_field(
name=self.global_champion_name, field="model_id"
)
self.log_print(["Champion selected. ID={}".format(self.champion_model_id)])
# Internal analysis
try:
if self.global_champion_id == self.true_model_id:
self.true_model_found = True
else:
self.true_model_found = False
except BaseException:
self.true_model_found = False
self._update_database_model_info()
if self.true_model_found:
self.log_print(
[
"True model found: {}".format(
model_building_utilities.alph(self.true_model_name)
)
]
)
self.log_print(
[
"True model considered: {}. on branch {}.".format(
self.true_model_considered, self.true_model_branch
)
]
)
# Consider reducing champion if negligible parameters found
if self.exploration_class.check_champion_reducibility:
self.check_champion_reducibility()
# Tidy up and finish QMLA.
self.finalise_instance()
self.log_print(
[
"\nFinal winner:",
self.global_champion_name,
"(ID {}) has F-score {}".format(
self.champion_model_id,
np.round(self.model_f_scores[self.champion_model_id], 2),
),
]
)
##########
# Section: Database interface
##########
[docs] def _get_model_data_by_field(self, name, field):
r"""
Get any data from the model database corresponding to a given model name.
:param str name: model name to get data of
:param str field: field name to get data corresponding to model
"""
d = self.model_database[self.model_database["model_name"] == name][field].item()
return d
def _get_model_id_from_name(self, model_name):
model_id = self._get_model_data_by_field(name=model_name, field="model_id")
return model_id
[docs] def _consider_new_model(self, model_name):
r"""
Check whether a proposed model already exists.
Check whether the new model `name`, exists in
all previously considered models, held in `model_lists`, organised
by dimension of models.
If name has not been previously considered, 'New' is returned.
If name has been previously considered, the corresponding location
in db is returned.
:param dict model_lists: lists of models already considered, organised
by the number of qubits of those models
:param str name: model for consideration
"""
# Return true indicates it has not been considered and so can be added
al_name = qmla.model_building_utilities.alph(model_name)
n_qub = qmla.model_building_utilities.get_num_qubits(model_name)
if al_name in self.model_lists[n_qub]:
return (
"Previously Considered" # todo -- make clear if in legacy or running db
)
else:
return "New"
[docs] def _check_model_exists(self, model_name):
r"""
True if model already exists; False if not.
"""
if self._consider_new_model(model_name) == "New":
return False
else:
return True
##########
# Section: Utilities
##########
[docs] def log_print(self, to_print_list):
r"""Wrapper for :func:`~qmla.print_to_log`"""
qmla.logging.print_to_log(
to_print_list=to_print_list,
log_file=self.log_file,
log_identifier="QMLA {}".format(self.qmla_id),
)
[docs] def get_model_storage_instance_by_id(self, model_id):
r"""
Get the unique :class:`~qmla.ModelInstanceForLearning` for the given model_id.
:param int model_id: unique ID of desired model
:return: storage class of the model
:rtype: :class:`~qmla.ModelInstanceForLearning`
"""
idx = self.model_database.loc[
self.model_database["model_id"] == model_id
].index[0]
model_instance = self.model_database.loc[idx]["model_storage_instance"]
return model_instance
[docs] def _update_database_model_info(self):
r"""
Calls :meth:`~qmla.ModelForStorage.model_update_learned_values` for all models learned in this instance.
"""
self.log_print(["Updating info for all learned models"])
for mod_id in self.models_learned:
try:
mod = self.get_model_storage_instance_by_id(mod_id)
mod.model_update_learned_values()
except BaseException:
pass
[docs] def _inspect_remote_job_crashes(self):
r"""Check if any job on redis queue has failed."""
self.call_counter["job_crashes"] += 1
t_init = time.time()
if self.redis_databases["any_job_failed"]["Status"] == b"1":
# TODO better way to detect errors?
self.log_print(["Failure on remote job. Terminating QMLA."])
raise NameError("Remote model learning failure")
self.timings["inspect_job_crashes"] += time.time() - t_init
[docs] def _delete_unpicklable_attributes(self):
r"""Remove elements of QMLA which cannot be pickled, which cause errors if retained."""
del self.redis_conn
del self.redis_databases
del self.write_log_file
##########
# Section: Analysis/plotting methods
##########
[docs] def analyse_instance(self):
r"""Basic analysis of this instance"""
pickle.dump(
self.get_results_dict(),
open(self.qmla_controls.results_file, "wb"),
protocol=4,
)
storage_location = os.path.join(
self.qmla_controls.results_directory,
"storage_{}.p".format(self.qmla_controls.long_id),
)
pickle.dump(
self.storage,
open(storage_location, "wb"),
protocol=4,
)
if self.qhl_mode:
self._analyse_qhl()
elif self.qhl_mode_multiple_models:
self._analyse_multiple_model_qhl()
else:
self._analyse_qmla()
def _analyse_qhl(self):
return
def _analyse_multiple_model_qhl(self):
model_ids = [
self._get_model_id_from_name(model_name=mod)
for mod in self.exploration_class.qhl_models
]
for mid in model_ids:
mod = self.get_model_storage_instance_by_id(mid)
name = mod.model_name
results_file = str(
self.qmla_controls.results_directory
+ "results_"
+ str("m{}_q{}.p".format(int(mid), self.qmla_controls.long_id))
)
pickle.dump(
self.get_results_dict(model_id=mid),
open(results_file, "wb"),
protocol=4,
)
def _analyse_qmla(self):
expec_value_mods_to_plot = []
try:
expec_value_mods_to_plot = [self.true_model_id]
except BaseException:
pass
expec_value_mods_to_plot.append(self.champion_model_id)
champ_mod = self.get_model_storage_instance_by_id(self.champion_model_id)
try:
self.store_bayes_factors_to_csv(
save_to_file=str(
self.qmla_controls.results_directory
+ "bayes_factors_"
+ str(self.qmla_controls.long_id)
+ ".csv"
),
names_ids="latex",
)
except Exception as e:
self.log_print(
["failed to store_bayes_factors_to_csv with error {}".format(e)]
)
[docs] def store_bayes_factors_to_csv(self, save_to_file, names_ids="latex"):
r"""
*deprecated* Store the pairwise comparisons computed during this instance.
:func:`~qmla.analysis.model_bayes_factorsCSV` removed and is needed
TODO if wanted, find in old github commits and reimplement.
Wrapper for :func:`~qmla.analysis.model_bayes_factorsCSV`.
"""
qmla.analysis.model_bayes_factorsCSV(self, save_to_file, names_ids=names_ids)
[docs] def store_bayes_factors_to_shared_csv(self, bayes_csv):
r"""
Store the pairwise comparisons computed during this instance in a CSV shared by all concurrent instances.
"""
# TODO this doesn't get used anywhere useful any more; remove
qmla.analysis.update_shared_bayes_factor_csv(
self, self.qmla_controls.cumulative_csv
)
[docs] def compute_model_f_score(
self,
model_id,
model_name=None,
model_constructor=None,
exploration_class=None,
beta=1, # beta=1 for F1-score. Beta is relative importance of sensitivity to precision
):
r"""
Compte and store f-score of given model.
:param int model_id: model ID to compute f-score of
:param float beta: for generalised F_beta score. (default) 1 for F1 score.
:return float f_score: F-score of given model.
"""
# TODO set precision, f-score etc as model instance attributes and
# return those in champion_results
true_set = self.exploration_class.true_model_terms
self.log_print(["Getting F score for model {}".format(model_id)])
if exploration_class is None:
model_name = self.model_name_id_map[model_id]
stored_model = self.get_model_storage_instance_by_id(model_id)
exploration_class = stored_model.exploration_class
# terms = [
# exploration_class.latex_name(
# term
# )
# for term in
# model_building_utilities.get_constituent_names_from_name(
# model_name
# )
# ]
terms = model_constructor.terms_names_latex
learned_set = set(sorted(terms))
total_positives = len(true_set)
true_positives = len(true_set.intersection(learned_set))
false_positives = len(learned_set - true_set)
false_negatives = len(true_set - learned_set)
precision = true_positives / (true_positives + false_positives)
sensitivity = true_positives / total_positives
try:
f_score = (1 + beta ** 2) * (
(precision * sensitivity) / (beta ** 2 * precision + sensitivity)
)
except BaseException:
# both precision and sensitivity=0 as true_positives=0
f_score = 0
self.model_f_scores[model_id] = f_score
self.model_precisions[model_id] = precision
self.model_sensitivities[model_id] = sensitivity
return f_score
[docs] def plot_instance_outcomes(
self,
):
r"""
Generate plots corresponding to this instance.
A number of plotting routines are called, depending on the plot_level
set by the user at launch.
"""
self.log_print(["Plotting instance outcomes"])
plot_methods_by_level = {
1: [
self._plot_model_terms,
],
2: [
self._plot_one_qubit_probes_bloch_sphere,
],
3: [
self._plot_dynamics_all_models_on_branches,
self._plot_bayes_factors,
self._plot_branch_champs_quadratic_losses,
],
4: [
self._plot_exploration_tree,
self._plot_r_squared_by_epoch_for_model_list,
self._plot_statistical_metrics,
],
}
for pl in range(self.plot_level + 1):
if pl in plot_methods_by_level:
self.log_print(["Plotting for plot_level={}".format(pl)])
for method in plot_methods_by_level[pl]:
try:
method()
except Exception as e:
self.log_print(
[
"plot failed {} with exception: {}".format(
method.__name__, e
)
]
)
if self.plot_level >= 3:
try:
self.branch_graphs = qmla.analysis.branch_graphs.plot_qmla_branches(
q=self, show_fscore_cmap=True, return_graphs=False
)
except:
self.log_print(["Failed to plot branch graphs."])
self.log_print(["Plotting exploration strategy analysis"])
self.exploration_class.exploration_strategy_specific_plots(
save_directory=self.qmla_controls.plots_directory,
qmla_id=self.qmla_controls.long_id,
true_model_id=self.true_model_id,
champion_model_id=self.champion_model_id,
plot_level=self.plot_level,
figure_format=self.qmla_controls.figure_format,
)
[docs] def compute_statistical_metrics_by_generation(self):
r"""
Compute, store and plot various statistical metrics of all studied models.
:param str save_to_file: path to save the resultant figure in.
"""
generations = sorted(set(self.branches.keys()))
self.log_print(
[
"[compute_statistical_metrics_by_generation]",
"generations: ",
generations,
]
)
generational_sensitivity = {b: [] for b in generations}
generational_f_score = {b: [] for b in generations}
generational_precision = {b: [] for b in generations}
self.generational_log_likelihoods = {b: [] for b in generations}
for b in generations:
models_this_branch = sorted(self.branches[b].resident_model_ids)
self.log_print(
[
"Adding models to generational measures for Generation {}:{}".format(
b, models_this_branch
)
]
)
for m in models_this_branch:
generational_sensitivity[b].append(self.model_sensitivities[m])
generational_precision[b].append(self.model_precisions[m])
generational_f_score[b].append(self.model_f_scores[m])
self.generational_log_likelihoods[b].append(
self.get_model_storage_instance_by_id(m).evaluation_log_likelihood
)
self.generational_f_score = generational_f_score
self.generational_sensitivity = generational_sensitivity
self.generational_precision = generational_precision
self.stat_data = [
{"name": "F-score", "data": self.generational_f_score, "colour": "red"},
{
"name": "Precision",
"data": self.generational_precision,
"colour": "blue",
},
{
"name": "Sensitivity",
"data": self.generational_sensitivity,
"colour": "green",
},
]
self.generational_statistical_metrics = {
k["name"]: k["data"] for k in self.stat_data
}
def _plot_statistical_metrics(self, save_to_file=None):
generations = sorted(set(self.branches.keys()))
self.alt_generational_statistical_metrics = {
b: {
"Precision": self.generational_precision[b],
"Sensitivity": self.generational_sensitivity[b],
"F-score": self.generational_f_score[b],
}
for b in generations
}
include_plots = self.stat_data
lf = LatexFigure(gridspec_layout=(1, len(include_plots)))
plot_col = 0
for plotting_data in include_plots:
# ax = fig.add_subplot(gs[0, plot_col])
ax = lf.new_axis()
data = plotting_data["data"]
ax.plot(
generations,
[np.median(data[b]) for b in generations],
label="{} median".format(plotting_data["name"]),
color=plotting_data["colour"],
marker="o",
)
ax.fill_between(
generations,
[np.min(data[b]) for b in generations],
[np.max(data[b]) for b in generations],
alpha=0.2,
label="{} min/max".format(plotting_data["name"]),
color=plotting_data["colour"],
)
ax.set_ylabel("{}".format(plotting_data["name"]))
ax.set_xlabel("Generation")
ax.legend()
ax.set_ylim(0, 1)
# plot_col += 1
self.log_print(["getting statistical metrics complete"])
if save_to_file is not None:
plt.savefig(save_to_file)
[docs] def _plot_bayes_factors(
self,
):
r"""
Plot Bayes factors between pairs of models, both by model IDs and by their F-scores.
"""
# Plot Bayes factors of this instance
bayes_factor_by_id = pd.pivot_table(
self.bayes_factors_df,
values="log10_bayes_factor",
index=["id_a"],
columns=["id_b"],
aggfunc=np.median,
)
mask = np.tri(bayes_factor_by_id.shape[0], k=-1).T
lf = LatexFigure(auto_label=False)
ax = lf.new_axis()
sns.heatmap(
bayes_factor_by_id,
cmap=self.exploration_class.bf_cmap,
mask=mask,
ax=ax,
annot=False,
cbar_kws={
"orientation": "vertical",
"label": r"$\log_{10}\left(B_{i,j}\right)$",
},
)
ax.set_ylabel(r"ID $\hat{H}_i$")
ax.set_xlabel(r"ID $\hat{H}_j$")
lf.save(
os.path.join(
self.qmla_controls.plots_directory,
"bayes_factors".format(self.qmla_controls.long_id),
),
file_format=self.qmla_controls.figure_format,
)
# Heat map BF against F(A)/F(B)
qmla.analysis.bayes_factor_f_score_heatmap(
bayes_factors_df=self.bayes_factors_df,
save_to_file=os.path.join(
self.qmla_controls.plots_directory, "bayes_factors_by_f_score"
),
)
[docs] def _plot_branch_champs_quadratic_losses(
self,
):
r"""Wrapper for :func:`~qmla.analysis.plot_quadratic_loss`."""
qmla.analysis.plot_quadratic_loss(
qmd=self,
champs_or_all="champs",
save_to_file=os.path.join(
self.qmla_controls.plots_directory, "quadratic_losses_branch_champs.pdf"
),
)
[docs] def _plot_branch_champs_volumes(
self,
model_id_list=None,
branch_champions=True,
branch_id=None,
save_to_file=None,
):
r"""
Plot the volume of each branch champion within this instance.
:param list model_id_list: list of model IDs to plot volumes of,
if None plot branch champions
:param bool branch_champions: force plot only branch champions' volumes
:param int branch_id: if provided, plot the volumes of all models within
that branch
:param str save_to_file: path at which to store the resultant figure.
"""
plt.clf()
plot_descriptor = (
"\n("
+ str(self.num_particles)
+ "particles; "
+ str(self.num_experiments)
+ "experiments)."
)
if branch_champions:
# only plot for branch champions
model_id_list = list(self.branch_champions.values())
plot_descriptor += "[Branch champions]"
elif branch_id is not None:
model_id_list = list(
self.model_database[self.model_database["branch_id"] == branch_id][
"model_id"
]
)
plot_descriptor += "[Branch" + str(branch_id) + "]"
elif model_id_list is None:
self.log_print(["Plotting volumes for all models by default."])
model_id_list = range(self.highest_model_id)
plot_descriptor += "[All models]"
plt.title("Volume evolution through QMD " + plot_descriptor)
plt.xlabel("Epoch")
plt.ylabel("Volume")
for i in model_id_list:
vols = self.get_model_storage_instance_by_id(i).volume_by_epoch
plt.semilogy(vols, label=str("ID:" + str(i)))
ax = plt.subplot(111)
# Shrink current axis's height by 10% on the bottom
box = ax.get_position()
ax.set_position(
[box.x0, box.y0 + box.height * 0.1, box.width, box.height * 0.9]
)
# Put a legend below current axis
lgd = ax.legend(
loc="upper center",
bbox_to_anchor=(0.5, -0.15),
fancybox=True,
shadow=True,
ncol=4,
)
if save_to_file is None:
plt.show()
else:
plt.savefig(save_to_file, bbox_extra_artists=(lgd,), bbox_inches="tight")
[docs] def _plot_parameter_learning_champion(
self,
):
r"""
Plot parameter estimates vs experiment number for a single model.
Wrapper for :func:`~qmla.analysis.plot_parameter_estimates`
:param bool true_model: whether to force only plotting the true
model's parameter estimeates
"""
qmla.analysis.plot_parameter_estimates(
qmd=self,
model_id=self.champion_model_id,
save_to_file=os.path.join(
self.qmla_controls.plots_directory, "champion_parameters.png"
),
)
[docs] def _plot_parameter_learning_true(
self,
):
r"""
Plot parameter estimates vs experiment number for a single model.
Wrapper for :func:`~qmla.analysis.plot_parameter_estimates`
:param bool true_model: whether to force only plotting the true
model's parameter estimeates
"""
if self.true_model_id == -1:
return
qmla.analysis.plot_parameter_estimates(
qmd=self,
model_id=self.true_model_id,
save_to_file=os.path.join(
self.qmla_controls.plots_directory, "champion_parameters.png"
),
)
[docs] def _plot_parameter_learning_single_model(
self, model_id=0, true_model=False, save_to_file=None
):
r"""
Plot parameter estimates vs experiment number for a single model.
Wrapper for :func:`~qmla.analysis.plot_parameter_estimates`
:param bool true_model: whether to force only plotting the true
model's parameter estimeates
"""
if true_model:
model_id = self._get_model_id_from_name(name=self.true_model_name)
qmla.analysis.plot_parameter_estimates(
qmd=self, model_id=model_id, save_to_file=save_to_file
)
[docs] def _plot_branch_champions_dynamics(
self,
all_models=False,
model_ids=None,
):
r"""
Plot reproduced dynamics of all branch champions
:param bool all_models: whether to plot all models in the instance
:param list model_ids: list of model IDs to plot dynamics of
:param str save_to_file: path at which to save the resultant figure
"""
include_params = False
include_bayes_factors = False
if all_models:
model_ids = list(sorted(self.model_name_id_map.keys()))
elif self.qhl_mode:
model_ids = [self.true_model_id]
include_params = True
elif self.qhl_mode_multiple_models:
model_ids = list(self.qhl_mode_multiple_models_model_ids)
elif self.exploration_class.tree_completed_initially:
model_ids = list(self.models_learned)
include_bayes_factors = True
include_params = True
elif model_ids is None:
model_ids = [self.branches[b].champion_id for b in self.branches]
include_bayes_factors = True
self.log_print(["Plotting dynamics of models:", model_ids])
path_to_save = os.path.join(self.qmla_controls.plots_directory, "dynamics.png")
try:
include_times_learned = False
include_params = False
qmla.analysis.plot_learned_models_dynamics(
qmd=self,
include_bayes_factors=include_bayes_factors,
include_times_learned=include_times_learned,
include_param_estimates=include_params,
model_ids=model_ids,
save_to_file=path_to_save,
)
except BaseException:
self.log_print(["Failed to plot dynamics"])
# raise
[docs] def _plot_volume_after_qhl(
self, model_id=None, true_model=True, show_resamplings=True, save_to_file=None
):
r"""
Plot volume vs experiment number of a single model.
Wrapper for :func:`~qmla.analysis.plot_volume_after_qhl`
"""
qmla.analysis.plot_volume_after_qhl(
qmd=self,
model_id=model_id,
true_model=true_model,
show_resamplings=show_resamplings,
save_to_file=save_to_file,
)
[docs] def _plot_exploration_tree(
self, modlist=None, only_adjacent_branches=True, save_to_file=None
):
r"""Wrapper for :func:`~qmla.analysis.plot_qmla_single_instance_tree`"""
if save_to_file is None:
save_to_file = os.path.join(
self.qmla_controls.plots_directory, "exploration_tree.png"
)
qmla.analysis.plot_qmla_single_instance_tree(
self,
modlist=modlist,
only_adjacent_branches=only_adjacent_branches,
save_to_file=save_to_file,
)
[docs] def _plot_qmla_radar_scores(self, modlist=None, save_to_file=None):
r"""*deprecated* Wrapper for :func:`~qmla.analysis.plotRadar`."""
plot_title = str("Radar Plot QMD " + str(self.qmla_id))
if modlist is None:
modlist = list(self.branch_champions.values())
qmla.analysis.plotRadar(
self, modlist, save_to_file=save_to_file, plot_title=plot_title
)
[docs] def _plot_r_squared_by_epoch_for_model_list(self, modlist=None, save_to_file=None):
r"""
Plot $R^2$ vs experiment number for given model list.
"""
if modlist is None:
modlist = []
try:
modlist.append(self.champion_model_id)
except BaseException:
pass
try:
modlist.append(self.true_model_id)
except BaseException:
pass
if save_to_file is None:
save_to_file = os.path.join(
self.qmla_controls.plots_directory, "r_squareds.png"
)
qmla.analysis.r_squared_from_epoch_list(
qmd=self, model_ids=modlist, save_to_file=save_to_file
)
[docs] def _plot_one_qubit_probes_bloch_sphere(self, save=False):
r"""Show all one qubit probes on Bloch sphere."""
qmla.utilities.plot_probes_on_bloch_sphere(
probe_dict=self.probes_system,
num_probes=self.probe_number,
save_to_file=os.path.join(
self.qmla_controls.plots_directory,
"probes_bloch_sphere.{}".format(self.qmla_controls.figure_format),
),
)
[docs] def _plot_model_terms(self, colour_by="binary"):
"""
Plot the terms of each model by model ID.
:param colour_by: defaults to 'binary' for black/white; alternatively colour by f_score of model
:type colour_by: str, optional
"""
plt.rcParams.update({"text.usetex": False})
if self.plot_level < 1:
return
# Prepare dataframes
unique_terms = list(
set(qmla.utilities.flatten(list(self.model_database.latex_terms)))
)
unique_branches = list(
set(qmla.utilities.flatten(list(self.model_database.branches_present_on)))
)
unique_branches = ["branch_{}".format(int(b)) for b in unique_branches]
database_columns = ["model_id", "f_score"] + unique_terms
model_reference_database = pd.DataFrame(columns=database_columns)
branch_cols = ["model_id", "f_score"] + unique_branches
models_branches = pd.DataFrame(columns=branch_cols)
for model_id in self.model_database.model_id:
model_data = self.model_database[self.model_database.model_id == model_id]
model_id = int(model_id)
f_score = model_data["f_score"].item()
if colour_by == "binary":
terms_in_model = {
term: int(1) # for binary representation
for term in model_data.latex_terms.item()
}
elif colour_by == "f_score":
terms_in_model = {
term: f_score # to colour by f_score
for term in model_data.latex_terms.item()
}
terms_in_model["model_id"] = int(model_id)
terms_in_model["f_score"] = model_data.f_score.item()
model_reference_database.loc[len(model_reference_database)] = pd.Series(
terms_in_model
)
branches = {
"branch_{}".format(int(b)): 1
for b in model_data.branches_present_on.item()
}
branches["model_id"] = int(model_id)
models_branches.loc[len(models_branches)] = pd.Series(branches)
if colour_by == "binary":
models_branches.fillna(0, inplace=True)
model_reference_database.fillna(0, inplace=True)
piv_table = pd.pivot_table(
columns=["model_id"], values=unique_terms, data=model_reference_database
).transpose()
# Plot as heatmap
lf = LatexFigure(
auto_label=False,
font_scale=2,
) # TODO make figure size depend on num terms.
ax = lf.new_axis()
if colour_by == "f_score":
sns.heatmap(
piv_table,
cmap=self.exploration_class.f_score_cmap,
ax=ax,
cbar_kws={
"label": "F-score",
},
)
elif colour_by == "binary":
sns.heatmap(
piv_table,
linewidths=0.5,
cmap="binary",
cbar=False,
ax=ax,
)
ax.tick_params(which="y", rotation=0)
# fontsize = 20
ax.tick_params(
top=True,
bottom=False,
labeltop=True,
labelbottom=False,
labelrotation=0,
# labelsize=fontsize
)
ax.set_ylabel("Model ID")
ax.set_xlabel("Term")
lf.save(
os.path.join(self.qmla_controls.plots_directory, "composition_of_models"),
file_format=self.qmla_controls.figure_format,
)
[docs] def _plot_dynamics_all_models_on_branches(self, branches=None):
"""Plot the dynamics of all models on given branches.
:param branches: list of branches to draw dynamics for, defaults to None, in which case all branches are drawn.
:type branches: list, optional
"""
self.branch_results_dir = os.path.join(
self.qmla_controls.plots_directory, "branches"
)
try:
os.makedirs(self.branch_results_dir)
except:
pass
if branches is None:
branches = sorted(list(self.branches.keys()))
colours = itertools.cycle(
["blue", "orange", "green", "cyan", "purple", "olive", "grey"]
)
linestyles = itertools.cycle(["solid", "dashed", "dotted", "dashdot"])
max_models_per_subplot = 5
for branch_id in branches:
models = self.branches[branch_id].resident_model_ids
times = sorted(self.experimental_measurements.keys())
num_rows = math.ceil(len(models) / max_models_per_subplot)
lf = LatexFigure(
fraction=0.45, gridspec_layout=(num_rows, 1), auto_label=False
)
self.log_print(
[
"plotting branch dynamics. On branch {} there are {} rows".format(
branch_id, num_rows
)
]
)
n_models_this_row = 0
ax = lf.new_axis()
for m in models:
mod = self.get_model_storage_instance_by_id(m)
computed_expec_val_times = sorted(mod.expectation_values.keys())
try:
exp_vals = [
mod.expectation_values[t] for t in computed_expec_val_times
]
except:
self.log_print(
["Failed to get expectation values for model id {}".format(m)]
)
raise
ax.plot(
computed_expec_val_times,
exp_vals,
label=r"${}$".format(m),
# label="{} (ID={}, $LL$={})".format(mod.model_name_latex, m, mod.evaluation_log_likelihood),
color=next(colours),
ls=next(linestyles),
)
n_models_this_row += 1
if n_models_this_row == max_models_per_subplot:
n_models_this_row = 0
ax = lf.new_axis()
for row in range(num_rows):
# Add system dynamics to each subplot
ax = lf.gridspec_axes[(row, 0)]
ax.scatter(
times,
[self.experimental_measurements[t] for t in times],
c="red",
label=r"$Q$",
s=5,
)
ax.set_xlim(0, max(times))
ax.set_ylim(0, 1.05)
ax.set_yticks([0, 0.5, 1])
ax.set_ylabel("Expectation Value")
ax.set_xlabel("Time")
ax.legend(
bbox_to_anchor=(1, 0.8)
# loc = "upper center",
# ncol=2
)
path = os.path.join(
self.branch_results_dir, "dynamics_branch_{}".format(branch_id)
)
lf.save(path, file_format=self.qmla_controls.figure_format)
[docs] def _plot_evaluation_normalisation_records(self):
"""Plot the normalisation record of all models grouped by the branch they are on."""
if self.plot_level < 3:
return
for branch_id in list(self.branches.keys()):
fig, ax = plt.subplots(figsize=(15, 10), tight_layout=True)
for m in self.branches[branch_id].resident_model_ids:
mod = self.get_model_storage_instance_by_id(m)
ax.hist(
qmla.utilities.flatten(mod.evaluation_normalization_record),
bins=np.arange(0, 1, 0.05),
label="{} ($LL={}$)".format(
mod.model_name_latex,
# TODO use ES of branch to get latex name
mod.evaluation_log_likelihood,
),
histtype="step",
)
ax.legend(
bbox_to_anchor=(1.1, 1.05),
fontsize=12,
)
ax.set_ylabel("Frequency")
ax.set_xlabel("Likelihood")
ax.set_title(
"Normalisation record for evaluating models on branch {}".format(
branch_id
)
)
fig.savefig(
os.path.join(
self.branch_results_dir,
"normalisation_record_branch_{}.png".format(branch_id),
)
)