import itertools
from multiprocessing.pool import Pool, ThreadPool
from numpy import array, hstack
from copy import deepcopy
from numpy import random
from . import Settings
from .experiment import Experiment
from ..environment import environments
def _f(args):
settings, evaluate_indices, testcases = args
random.seed()
xp = Experiment.from_settings(settings)
xp.evaluate_at(evaluate_indices, testcases)
xp.run()
return xp.log
[docs]class ExperimentPool(object):
def __init__(self, settings, evaluate_at,
testcases=None, same_testcases=False):
""" Pool of experiments running in parallel.
:param list settings: list of settings used to create the pool of experiments.
:param list evaluate_at: iteration indices where an evaluation should be performed.
:param numpy.array testcases: specify a pre-defined testcases, by default a testcases will be generated.
:param bool same_testcases: whether to use the same testcases for all experiments.
The Pool will create :class:`~explauto.experiment.experiment.Experiment` using the :meth:`~explauto.experiment.experiment.Experiment.from_settings` constructor for each combination of parameters given.
.. note:: If you set same_testcases to True the first experiment will generate a testcase used by all the others experiment. Otherwise, each experiment will generate its own testcase.
"""
if same_testcases and testcases is None:
s = settings[0]
testcases = s.default_testcases
self._config = zip(settings,
itertools.repeat(evaluate_at),
itertools.repeat(testcases))
@classmethod
[docs] def from_settings_product(cls, environments, babblings,
interest_models, sensorimotor_models,
evaluate_at,
testcases=None, same_testcases=False):
""" Creates a ExperimentPool with the product of all the given settings.
:param environments: e.g. [('simple_arm', 'default'), ('simple_arm', 'high_dimensional')]
:type environments: list of (environment name, config name)
:param babblings: e.g. ['motor', 'goal']
:type bablings: list of babbling modes
:param interest_models: e.g. [('random', 'default')]
:type interest_models: list of (interest model name, config name)
:param sensorimotor_models: e.g. [('non_parametric', 'default')]
:type sensorimotor_models: list of (sensorimotor model name, config name)
:param evaluate_at: indices defining when to evaluate
:type evaluate_at: list of int
:param bool same_testcases: whether to use the same testcases for all experiments
"""
l = itertools.product(environments, babblings,
interest_models, sensorimotor_models)
settings = [Settings(env, env_conf, bab, im, im_conf, sm, sm_conf)
for ((env, env_conf), bab, (im, im_conf), (sm, sm_conf)) in l]
return cls(settings, evaluate_at, testcases, same_testcases)
[docs] def run(self, repeat=1, processes=None, use_thread=False):
""" Runs all experiments using a :py:class:`multiprocessing.Pool`.
:param int repeat: Number of time each experiment will be repeated.
:param int processes: Number of processes launched in parallel (Default: uses all the availabled CPUs)
:param bool use_thread: Use a :py:class:`~multiprocessing.pool.ThreadPool` instead of a :py:class:`~multiprocessing.pool.Pool`. By default, tries to guess depending on the environment which one to use.
"""
mega_config = [c for c in self._config for _ in range(repeat)]
env = [environments[s.environment][0] for s in self.settings]
use_process = array([e.use_process for e in env]).all() and (not use_thread)
pool = Pool(processes) if use_process else ThreadPool(processes)
logs = pool.map(_f, mega_config)
logs = array(logs).reshape(-1, repeat)
self._add_logs(logs)
return logs
@property
[docs] def settings(self):
""" Returns a copy of the list of all the settings used. """
return array(self._config)[:, 0].tolist()
@property
[docs] def logs(self):
""" Returns the list of :class:`~explauto.experiment.log.ExperimentLog`.
.. note:: The logs will be returned as a vector if repeat was set as one in the :meth:`~explauto.experiment.pool.ExperimentPool.run` method else it will be a matrix where each rows represents the n repetitions of an experiment.
"""
if not hasattr(self, '_logs'):
raise ValueError('You have to run the pool of experiments first!')
logs = self._logs.reshape(-1) if self._logs.shape[1] == 1 else self._logs
return deepcopy(logs)
def _add_logs(self, logs):
if not hasattr(self, '_logs'):
self._logs = logs
else:
self._logs = hstack((self._logs, logs))