# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - initial API and implementation and/or initial
# documentation
# :author: Matthias De Lozzo
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""
Benchmark MDO formulations based on scalable disciplines
========================================================
The :mod:`~gemseo.problems.scalable.api` implements several classes
to benchmark MDO formulations based on scalable disciplines.
The :class:`.ScalabilityStudy` class implements
the concept of scalability study:
1. By instantiating a :class:`.ScalabilityStudy`, the user defines
the MDO problem in terms of design parameters, objective function and
constraints.
2. For each discipline, the user adds a dataset stored
in a :class:`.AbstractFullCache` and select a type of
:class:`.ScalableModel` to build the :class:`.ScalableDiscipline`
associated with this discipline.
3. The user adds different optimization strategies, defined in terms
of both optimization algorithms and MDO formulation.
4. The user adds different scaling strategies, in terms of sizes of
design parameters, coupling variables and equality and inequality
constraints. The user can also define a scaling strategies according to
particular parameters rather than groups of parameters.
5. Lastly, the user executes the :class:`.ScalabilityStudy` and the results
are written in several files and stored into directories
in a hierarchical way, where names depends on both MDO formulation,
scaling strategy and replications when it is necessary. Different kinds
of files are stored: optimization graphs, dependency matrix plots and
of course, scalability results by means of a dedicated class:
:class:`.ScalabilityResult`.
The :class:`.PostScalabilityStudy` class implements the way as the set of
:class:`.ScalabilityResult`-based result files
contained in the study directory are graphically post-processed. This class
provides several methods to easily change graphical properties, notably
the plot labels. It also makes it possible to define a cost function per
MDO formulation, converting the numbers of executions and linearizations
of the different disciplines required by a MDO process in an estimation
of the computational cost associated with what would be a scaled version
of the true problem.
.. warning::
Comparing MDO formulations in terms of estimated true computational time
rather than CPU time of the :class:`.ScalabilityStudy` is highly
recommended.
Indeed, time is often an obviousness criterion to distinguish between
MDO formulations having the same performance in terms of distance to the
optimum: look at our calculation budget and choose the best formulation
that satisfies this budget, or even saves us time. Thus, it is important
to carefully define these cost functions.
"""
from __future__ import absolute_import, division, unicode_literals
import numbers
import os
import pickle
from builtins import int, open
from copy import deepcopy
import matplotlib.pyplot as plt
from future import standard_library
from matplotlib.lines import Line2D
from numpy import array, inf, median
from past.builtins import basestring
from gemseo.problems.scalable.problem import ScalableProblem
standard_library.install_aliases()
from gemseo import LOGGER
CURRENT_DIRECTORY = os.getcwd()
RESULTS_DIRECTORY = "results"
POST_DIRECTORY = "visualization"
POSTOPTIM_DIRECTORY = os.path.join(POST_DIRECTORY, "optimization_history")
POSTSTUDY_DIRECTORY = os.path.join(POST_DIRECTORY, "scalability_study")
POSTSCAL_DIRECTORY = os.path.join(POST_DIRECTORY, "dependency_matrix")
[docs]class ScalabilityStudy(object):
"""Scalability Study."""
def __init__(
self,
objective,
design_variables,
directory="study",
prefix="",
eq_constraints=None,
ineq_constraints=None,
maximize_objective=False,
fill_factor=0.7,
active_probability=0.1,
feasibility_level=0.8,
start_at_equilibrium=True,
early_stopping=True,
coupling_variables=None,
):
"""
The constructor of the ScalabilityStudy class requires two mandatory
arguments:
- the :code:`'objective'` name,
- the list of :code:`'design_variables'` names.
Concerning output files, we can specify:
- the :code:`directory` which is :code:`'study'` by default,
- the prefix of output file names (default: no prefix).
Regarding optimization parametrization, we can specify:
- the list of equality constraints names (:code:`eq_constraints`),
- the list of inequality constraints names (:code:`ineq_constraints`),
- the choice of maximizing the objective function
(:code:`maximize_objective`).
By default, the objective function is minimized and the MDO problem
is unconstrained.
Last but not least, with regard to the scalability methodology,
we can overwrite:
- the default fill factor of the input-output dependency matrix
:code:`ineq_constraints`,
- the probability to set the inequality constraints as active at
initial step of the optimization :code:`active_probability`,
- the offset of satisfaction for inequality constraints
:code:`feasibility_level`,
- the use of a preliminary MDA to start at equilibrium
:code:`start_at_equilibrium`,
- the post-processing of the optimization database to get results
earlier than final step :code:`early_stopping`.
:param str objective: name of the objective
:param list(str) design_variables: names of the design variables
:param str directory: working directory of the study. Default: 'study'.
:param str prefix: prefix for the output filenames. Default: ''.
:param list(str) eq_constraints: names of the equality constraints.
Default: None.
:param list(str) ineq_constraints: names of the inequality constraints
Default: None.
:param bool maximize_objective: maximizing objective. Default: False.
:param float fill_factor: default fill factor of the input-output
dependency matrix. Default: 0.7.
:param float active_probability: probability to set the inequality
constraints as active at initial step of the optimization.
Default: 0.1
:param float feasibility_level: offset of satisfaction for inequality
constraints. Default: 0.8.
:param bool start_at_equilibrium: start at equilibrium
using a preliminary MDA. Default: True.
:param bool early_stopping: post-process the optimization database
to get results earlier than final step.
"""
LOGGER.info("*** Start a scalability study ***")
self.prefix = prefix
self.directory = directory
self.__create_directories()
self.datasets = []
self.objective = objective
self.design_variables = design_variables
if eq_constraints is None:
eq_constraints = []
self.eq_constraints = eq_constraints
if ineq_constraints is None:
ineq_constraints = []
self.ineq_constraints = ineq_constraints
if coupling_variables is None:
coupling_variables = []
self.coupling_variables = coupling_variables
self.maximize_objective = maximize_objective
self.formulations = []
self.formulations_options = []
self.algorithms = []
self.algorithms_options = []
self.scalings = []
self.var_scalings = []
self.__check_fill_factor(fill_factor)
self._default_fill_factor = fill_factor
self.__check_proportion(active_probability)
self.active_probability = active_probability
if not hasattr(feasibility_level, "__len__"):
self.__check_proportion(feasibility_level)
else:
for _, value in feasibility_level.items():
self.__check_proportion(value)
self.feasibility_level = feasibility_level
self.start_at_equilibrium = start_at_equilibrium
self.results = []
self.early_stopping = early_stopping
self._group_dep = {}
self._fill_factor = {}
self.top_level_diff = []
self._all_data = None
LOGGER.info("Initialize the scalability study:")
optimize = "maximize" if self.maximize_objective else "minimize"
LOGGER.info("| Objective: %s %s", optimize, self.objective)
tmp = ", ".join(self.design_variables)
LOGGER.info("| Design variables: %s", tmp)
tmp = ", ".join(self.eq_constraints)
LOGGER.info("| Equality constraints: %s", tmp)
tmp = ", ".join(self.ineq_constraints)
LOGGER.info("| Inequality constraints: %s", tmp)
LOGGER.info("| Default fill factor: %s", self._default_fill_factor)
LOGGER.info("| Active probability: %s", self.active_probability)
LOGGER.info("| Feasibility level: %s", self.feasibility_level)
LOGGER.info("| Start at equilibrium: %s", self.start_at_equilibrium)
LOGGER.info("| Early stopping: %s", self.early_stopping)
def __create_directories(self):
"""Create the different directories
to store results, post-processings, ..."""
LOGGER.info("Create directories:")
self.__create_directory(self.directory)
LOGGER.info("| Working directory: %s", self.directory)
directory = os.path.join(self.directory, POST_DIRECTORY)
self.__create_directory(directory)
LOGGER.info("| Post-processing: %s", directory)
directory = os.path.join(self.directory, POSTOPTIM_DIRECTORY)
self.__create_directory(directory)
LOGGER.info("| Optimization history view: %s", directory)
directory = os.path.join(self.directory, POSTSTUDY_DIRECTORY)
self.__create_directory(directory)
LOGGER.info("| Scalability views: %s", directory)
directory = os.path.join(self.directory, POSTSCAL_DIRECTORY)
self.__create_directory(directory)
LOGGER.info("| Dependency matrices: %s", directory)
directory = os.path.join(self.directory, RESULTS_DIRECTORY)
self.__create_directory(directory)
LOGGER.info("| Results: %s", directory)
@staticmethod
def __create_directory(path):
"""Create a directory from its relative or absolute path.
:param str path: relative or absolute directory path.
"""
if not os.path.exists(path):
os.mkdir(path)
[docs] def add_discipline(self, name, data):
"""This method adds a disciplinary dataset from:
- its :code:`name`,
- its :code:`data`,
:param str name: name of the discipline.
:param AbstractFullCache data: dataset provided as a cache.
"""
self._group_dep[name] = {}
self._all_data = data.get_all_data()
datum = self._all_data[1]
outputs_names = [str(fct_name) for fct_name in list(datum["outputs"].keys())]
outputs_sizes = [
str(fct_name) + "(" + str(len(fct_val)) + ")"
for fct_name, fct_val in datum["outputs"].items()
]
inputs_sizes = [
str(fct_name) + "(" + str(len(fct_val)) + ")"
for fct_name, fct_val in datum["inputs"].items()
]
self.datasets.append(data)
for output_name in outputs_names:
self.set_fill_factor(name, output_name, self._default_fill_factor)
LOGGER.info("Add a discipline:")
LOGGER.info("| Name: %s", name)
LOGGER.info("| Inputs: %s", ", ".join(inputs_sizes))
LOGGER.info("| Outputs: %s", ", ".join(outputs_sizes))
LOGGER.info("| Sample size: %s", data.get_length())
LOGGER.info("| Cache: %s", data.name)
@property
def disciplines_names(self):
"""Get discipline names.
:return: list of discipline names
:rtype: list(str)
"""
disc_names = [discipline.name for discipline in self.datasets]
return disc_names
[docs] def set_fill_factor(self, discipline, output, fill_factor):
"""
:param str discipline: name of the discipline
:param str output: name of the output function
:param float fill_factor: fill factor
"""
self.__check_discipline(discipline)
self.__check_output(discipline, output)
self.__check_fill_factor(fill_factor)
if discipline not in self._fill_factor:
self._fill_factor[discipline] = {}
self._fill_factor[discipline][output] = fill_factor
def __check_discipline(self, discipline):
"""Check if discipline is a string
comprised in the list of disciplines names.
"""
if not isinstance(discipline, basestring):
raise TypeError("The argument discipline should be a string")
disciplines_names = self.disciplines_names
if discipline not in disciplines_names:
raise ValueError(
"The argument discipline should be a string " "comprised in the list ",
disciplines_names,
)
def __check_output(self, discipline, output):
""" Check if output is an output function of discipline. """
self.__check_discipline(discipline)
if not isinstance(output, basestring):
raise TypeError("The argument output should be a string")
outputs_names = next(
cache.outputs_names for cache in self.datasets if cache.name == discipline
)
if output not in outputs_names:
raise ValueError(
"The argument output should be a string " "comprised in the list ",
outputs_names,
)
def __check_inputs(self, discipline, inputs):
""" Check if inputs is a list of inputs of discipline. """
self.__check_discipline(discipline)
inputs_names = next(
cache.inputs_names for cache in self.datasets if cache.name == discipline
)
if not isinstance(inputs, list):
raise TypeError("The argument inputs must be a list of string.")
for inpt in inputs:
self.__check_input(inpt, inputs_names)
@staticmethod
def __check_input(inpt, inputs_names):
""" Check if input is a string comprised in inputs_names. """
if not isinstance(inpt, basestring):
raise TypeError("The argument inpt must be a string.")
if inpt not in inputs_names:
raise ValueError(
"The argument inpt must be a string " "comprised in the list ",
inputs_names,
)
def __check_fill_factor(self, fill_factor):
"""Check if fill factor is a proportion or a number equal to -1
:param float fill_factor: a proportion or -1
"""
try:
self.__check_proportion(fill_factor)
except ValueError:
if fill_factor != -1:
raise TypeError(
"Fill factor should be "
"a float number comprised in 0 and 1 "
"or a number equal to -1."
)
@staticmethod
def __check_proportion(proportion):
"""Check if a proportion is a float number comprised in 0 and 1.
:param float proportion: proportion comprised in 0 and 1.
"""
if not isinstance(proportion, numbers.Number):
raise TypeError(
"A proportion should be " "a float number comprised in 0 and 1."
)
if proportion < 0 or proportion > 1:
raise ValueError(
"A proportion should be " "a float number comprised in 0 and 1."
)
[docs] def add_optimization_strategy(
self,
algo,
max_iter,
formulation="DisciplinaryOpt",
algo_options=None,
formulation_options=None,
top_level_diff="auto",
):
"""Add both optimization algorithm and MDO formulation,
as well as their options.
:param str algo: name of the optimization algorithm.
:param int max_iter: maximum number of iterations
for the optimization algorithm.
:param str formulation: name of the MDO formulation.
Default: 'DisciplinaryOpt'.
:param dict algo_options: options of the optimization algorithm.
:param dict formulation_options: options of the MDO formulation.
:param str top_level_diff: differentiation method
for the top level disciplines. Default: 'auto'.
"""
self.algorithms.append(algo)
if algo_options is None:
algo_options = {}
else:
if not isinstance(algo_options, dict):
raise TypeError("algo_options must be a dictionary.")
algo_options.update({"max_iter": max_iter})
self.algorithms_options.append(algo_options)
self.formulations.append(formulation)
self.formulations_options.append(formulation_options)
self.top_level_diff.append(top_level_diff)
[docs] def print_optimization_strategies(self):
""" Print the list of the optimization strategies. """
index = 0
print("Optimization strategies:")
for algo, formulation in zip(self.algorithms, self.formulations):
template = "# {} - Algorithm: {} {} - Formulation: {} {}"
form_opt = self.formulations_options[index]
form_opt = "" if form_opt is None else form_opt
algo_opt = self.algorithms_options[index]
algo_opt = "" if algo_opt is None else algo_opt
index += 1
print(template.format(index, algo, algo_opt, formulation, form_opt))
[docs] def add_scaling_strategies(
self,
design_size=None,
coupling_size=None,
eq_cstr_size=None,
ineq_cstr_size=None,
variables=None,
):
"""Add different scaling strategies.
:param design_size: size of the design variables. Default: None.
:type design_size: int or list(int)
:param coupling_size: size of the coupling variables. Default: None.
:type coupling_size: int or list(int)
:param eq_cstr_size: size of the equality constraints. Default: None.
:type eq_cstr_size: int or list(int)
:param ineq_cstr_size: size of the inequality constraints.
Default: None.
:type ineq_cstr_size: int or list(int)
"""
n_design = self.__check_varsizes_type(design_size)
n_coupling = self.__check_varsizes_type(coupling_size)
n_eq = self.__check_varsizes_type(eq_cstr_size)
n_ineq = self.__check_varsizes_type(ineq_cstr_size)
n_var = self.__check_varsizes_type(variables)
n_scaling = max(n_design, n_coupling, n_eq, n_ineq, n_var)
self.__check_scaling_consistency(n_design, n_scaling)
self.__check_scaling_consistency(n_coupling, n_scaling)
self.__check_scaling_consistency(n_eq, n_scaling)
self.__check_scaling_consistency(n_ineq, n_scaling)
design_size = self.__format_scaling(design_size, n_scaling)
coupling_size = self.__format_scaling(coupling_size, n_scaling)
eq_cstr_size = self.__format_scaling(eq_cstr_size, n_scaling)
ineq_cstr_size = self.__format_scaling(ineq_cstr_size, n_scaling)
for idx in range(n_scaling):
var_scaling = {}
self.__update_var_scaling(
var_scaling, design_size[idx], self.design_variables
)
self.__update_var_scaling(
var_scaling, coupling_size[idx], self.coupling_variables
)
self.__update_var_scaling(
var_scaling, eq_cstr_size[idx], self.eq_constraints
)
self.__update_var_scaling(
var_scaling, ineq_cstr_size[idx], self.ineq_constraints
)
scaling = {
"design_variables": design_size[idx],
"coupling_variables": coupling_size[idx],
"eq_constraint_size": eq_cstr_size[idx],
"ineq_constraint_size": ineq_cstr_size[idx],
}
if variables is not None:
for varname, value in variables[idx].items():
self.__update_var_scaling(var_scaling, value, [varname])
scaling[varname] = value
self.var_scalings.append(var_scaling)
self.scalings.append(scaling)
@staticmethod
def __format_scaling(size, n_scaling):
"""Convert a scaling size in a list of integers
whose length is equal to the number of scalings.
:param size: size(s) of a given variable
:type size: int or list(int)
:param int n_scaling: number of scalings
:return: formatted sizes
"""
formatted_sizes = size
if isinstance(formatted_sizes, int) or formatted_sizes is None:
formatted_sizes = [formatted_sizes]
if len(formatted_sizes) == 1:
formatted_sizes = formatted_sizes * n_scaling
return formatted_sizes
@staticmethod
def __update_var_scaling(scaling, size, varnames):
"""Update a scaling dictionary for a given list of variables
and a given size.
:param dict scaling: scaling dictionary whose keys are variable names
and values are dictionary with scaling properties,
e.g. {'size': val}
:param int size: size of the variable
:param list(str) varnames: list of variable names
"""
if size is not None:
scaling.update({varname: size for varname in varnames})
@staticmethod
def __check_scaling_consistency(n_var_scaling, n_scaling):
"""Check that for the different types of variables,
the number of scalings is the same or equal to 1.
:param int n_var_scaling: number of scalings
:param int n_scaling: expected number of scalings
"""
assert n_var_scaling in (n_scaling, 1)
@staticmethod
def __check_varsizes_type(varsizes):
"""Check the type of scaling sizes. Integer, list of integers
or None is expected. Return the number of scalings.
:return: length of scalings
"""
length = 1
if varsizes is not None:
if isinstance(varsizes, list):
for size in varsizes:
if isinstance(size, dict):
for _, value in size.items():
assert isinstance(value, int)
else:
assert isinstance(size, int)
length = len(varsizes)
else:
assert isinstance(varsizes, int)
length = 1
return length
[docs] def set_early_stopping(self):
"""Enable early stopping."""
self.early_stopping = True
[docs] def unset_early_stopping(self):
"""Disable early stopping."""
self.early_stopping = False
[docs] def print_scaling_strategies(self):
""" Print the list of the scaling strategies. """
index = 0
print("Scaling strategies:")
for scaling in self.scalings:
index += 1
msg = "# " + str(index)
for var_type, var_size in scaling.items():
msg += " - "
msg += var_type + ": " + str(var_size)
print(msg)
[docs] def execute(self, n_replicates=1):
"""Execute the scalability study, one or several times to take into
account the random features of the scalable problems.
:param int n_replicates: number of times the scalability study
is repeated. Default: 1.
"""
LOGGER.info("Execute scalability study %s times", n_replicates)
if not self.formulations and not self.algorithms:
raise ValueError(
"A scalable study needs at least "
"1 optimization strategy, "
"defined by a mandatory optimization algorithm, "
"and optional optimization algorithm, "
"optimization options"
)
index = 0
for algo, formulation in zip(self.algorithms, self.formulations):
for id_scaling, scaling in enumerate(self.var_scalings):
for replicate in range(1, n_replicates + 1):
problem = ScalableProblem(
self.datasets,
self.design_variables,
self.objective,
self.eq_constraints,
self.ineq_constraints,
self.maximize_objective,
scaling,
fill_factor=self._fill_factor,
seed=replicate,
group_dep=self._group_dep,
force_input_dependency=True,
allow_unused_inputs=False,
)
varnames = [algo, formulation, id_scaling + 1, replicate]
name = "_".join([self.prefix] + [str(var) for var in varnames])
if name[0] == "_":
name = name[1:]
directory = os.path.join(self.directory, POSTSCAL_DIRECTORY, name)
self.__create_directory(directory)
problem.plot_dependencies(True, False, directory)
form_opt = self.formulations_options[index]
if not isinstance(form_opt, dict):
formulation_options = {}
else:
formulation_options = form_opt
problem.create_scenario(
formulation,
"MDO",
self.start_at_equilibrium,
self.active_probability,
self.feasibility_level,
**formulation_options
)
top_level_disciplines = (
problem.scenario.formulation.get_top_level_disc()
)
for disc in top_level_disciplines:
disc.linearization_mode = self.top_level_diff[index]
algo_options = deepcopy(self.algorithms_options[index])
max_iter = algo_options["max_iter"]
del algo_options["max_iter"]
problem.scenario.execute(
{
"algo": algo,
"max_iter": max_iter,
"algo_options": algo_options,
}
)
stopidx, n_iter = self.__get_stop_index(problem)
ratio = float(stopidx) / n_iter
self.__save_opt_history_view(
problem,
algo + "_" + formulation,
str(id_scaling + 1),
str(replicate),
)
n_calls = {
disc: n_calls * ratio
for disc, n_calls in problem.n_calls.items()
}
tmp = problem.n_calls_linearize
n_calls_linearize = {disc: ncl * ratio for disc, ncl in tmp.items()}
tmp = problem.n_calls_top_level
n_calls_tl = {
disc: n_calls * ratio for disc, n_calls in tmp.items()
}
tmp = problem.n_calls_linearize_top_level
n_calls_linearize_tl = {
disc: ncltl * ratio for disc, ncltl in tmp.items()
}
exec_time = problem.exec_time()
exec_time *= ratio
status = problem.status
is_feasible = problem.is_feasible
result = ScalabilityResult(name, id_scaling + 1, replicate)
self.results.append(result)
inputs = problem.inputs
outputs = problem.outputs
disc_varnames = {
disc: inputs[disc] + outputs[disc]
for disc in problem.disciplines
}
sizes = problem.varsizes
new_varsizes = {
disc: {
name: scaling.get(name, sizes[name])
for name in disc_varnames[disc]
}
for disc in problem.disciplines
}
old_varsizes = problem.varsizes
result.get(
algo,
algo_options,
formulation,
formulation_options,
scaling,
n_calls,
n_calls_linearize,
n_calls_tl,
n_calls_linearize_tl,
exec_time,
status,
is_feasible,
problem.disciplines,
problem.outputs,
old_varsizes,
new_varsizes,
)
result.save(self.directory)
index += 1
return self.results
def __save_opt_history_view(self, problem, optim, scaling, replicate):
"""Save optimization history view
according to a directory tree structure.
:param ScalableProblem problem: scalable problem
:param str optim: optimization algorithm and MDO formulation
:param str scaling: scaling index
:param str replicate: replicate index
"""
LOGGER.info("Save optimization history view")
fpath = os.path.join(
self.directory, POSTOPTIM_DIRECTORY, "_".join([self.prefix] + [optim])
)
self.__create_directory(fpath)
fpath = os.path.join(fpath, "scaling_" + scaling)
self.__create_directory(fpath)
fpath = os.path.join(fpath, "replicate_" + replicate)
self.__create_directory(fpath)
fpath = fpath + "/"
problem.scenario.post_process(
"OptHistoryView", save=True, show=False, file_path=fpath
)
def __get_stop_index(self, problem):
"""Get stop index from a database.
:param ScalableProblem problem: scalable problem
:return: stop index, database length
:rtype: int, int
"""
database = problem.scenario.formulation.opt_problem.database
n_iter = len(database)
if self.early_stopping:
y_prev = inf
stopidx = 0
for _, value in database.items():
pbm = problem.scenario.formulation.opt_problem
diff = abs(y_prev - value[pbm.get_objective_name()])
diff /= abs(y_prev)
if diff < 1e-6:
break
y_prev = value[pbm.get_objective_name()]
stopidx += 1
else:
stopidx = n_iter
return stopidx, n_iter
[docs]class ScalabilityResult(object):
"""Scalability Result."""
def __init__(self, name, id_scaling, id_sample):
"""Constructor.
:param str name: name of the scalability result.
:param int id_scaling: scaling identifiant
:param int id_sample: sample identifiant
"""
self.name = name
self.id_scaling = id_scaling
self.id_sample = id_sample
self.algo = None
self.algo_options = None
self.formulation_options = None
self.formulation = None
self.scaling = None
self.n_calls = None
self.n_calls_linearize = None
self.n_calls_top_level = None
self.n_calls_linearize_top_level = None
self.exec_time = None
self.original_exec_time = None
self.status = None
self.is_feasible = None
self.disc_names = None
self.old_varsizes = None
self.new_varsizes = None
self.output_names = None
[docs] def get(
self,
algo,
algo_options,
formulation,
formulation_options,
scaling,
n_calls,
n_calls_linearize,
n_calls_top_level,
n_calls_linearize_top_level,
exec_time,
status,
is_feasible,
disc_names,
output_names,
old_varsizes,
new_varsizes,
):
"""Get a scalability result for a given optimization strategy
and a given scaling strategy.
:param str algo: name of the optimization algorithm
:param dict algo_options: options of the optimization algorithm
:param str formulation: name of the MDO formulation
:param dict formulation_options: options of the MDO formulation
:param scaling: scaling strategy
:param list(int) n_calls: number of calls for each discipline
:param list(int) n_calls_linearize: number of linearization
for each discipline
:param list(int) n_calls_top_level: number of calls for each discipline
:param list(int) n_calls_linearize_top_level: number of linearization
for each discipline
:param float exec_time: execution time
:param int status: status of the optimization scenario
:param bool is_feasible: feasibility of the optimization solution
:param list(str) disc_names: list of discipline names
:param dict output_names: list of output names
:param dict old_varsizes: old variable sizes
:param dict new_varsizes: new variable sizes
"""
self.algo = algo
self.algo_options = algo_options
self.formulation = formulation
self.formulation_options = formulation_options
self.scaling = scaling
self.n_calls = n_calls
self.n_calls_linearize = n_calls_linearize
self.n_calls_top_level = n_calls_top_level
self.n_calls_linearize_top_level = n_calls_linearize_top_level
self.exec_time = exec_time
self.status = status
self.is_feasible = is_feasible
self.disc_names = disc_names
self.output_names = output_names
self.old_varsizes = old_varsizes
self.new_varsizes = new_varsizes
[docs] def save(self, study_directory):
"""Save a scalability result into a pickle file
whose name is the name of the ScalabilityResult instance."""
fpath = os.path.join(study_directory, RESULTS_DIRECTORY, self.name + ".pkl")
result = {
"algo": self.algo,
"algo_options": self.algo_options,
"formulation": self.formulation,
"formulation_options": self.formulation_options,
"scaling": self.scaling,
"n_calls": self.n_calls,
"n_calls_linearize": self.n_calls_linearize,
"n_calls_top_level": self.n_calls_top_level,
"n_calls_linearize_top_level": self.n_calls_linearize_top_level,
"exec_time": self.exec_time,
"status": self.status,
"is_feasible": self.is_feasible,
"disc_names": self.disc_names,
"output_names": self.output_names,
"old_varsizes": self.old_varsizes,
"new_varsizes": self.new_varsizes,
}
with open(fpath, "wb") as fout:
pickle.dump(result, fout)
[docs] def load(self, study_directory):
"""Load a scalability result from a pickle file
whose name is the name of the ScalabilityResult instance."""
fpath = os.path.join(study_directory, RESULTS_DIRECTORY, self.name + ".pkl")
with open(fpath, "rb") as fin:
result = pickle.load(fin)
self.algo = result["algo"]
self.formulation = result["formulation"]
self.scaling = result["scaling"]
self.n_calls = result["n_calls"]
self.n_calls_linearize = result["n_calls_linearize"]
self.n_calls_top_level = result["n_calls_top_level"]
self.n_calls_linearize_top_level = result["n_calls_linearize_top_level"]
self.exec_time = result["exec_time"]
self.status = result["status"]
self.is_feasible = result["is_feasible"]
self.disc_names = result["disc_names"]
self.output_names = result["output_names"]
self.old_varsizes = result["old_varsizes"]
self.new_varsizes = result["new_varsizes"]
[docs]class PostScalabilityStudy(object):
"""The PostScalabilityStudy class aims to post-process a list of
scalability results stored in a directory.
"""
NOMENCLATURE = {
"exec_time": "Execution time (s)",
"original_exec_time": "Pseudo-original execution time",
"n_calls": "Number of discipline evaluations",
"n_calls_linearize": "Number of gradient evaluations",
"status": "Optimizatin status",
"is_feasible": "Feasibility of the solution (0 or 1)",
"scaling_strategy": "Scaling strategy index",
"total_calls": "Total number of evaluations",
}
def __init__(self, study_directory):
"""Constructor.
:param str study: directory of the scalability study."
"""
LOGGER.info("*** Start post-processing for scalability study ***")
LOGGER.info("Working directory: %s", study_directory)
self.study_directory = study_directory
self.scalability_results = self.__load_results()
self.n_results = len(self.scalability_results)
self.descriptions = self.NOMENCLATURE
self.cost_function = {}
self.unit_cost = None
for result in self.scalability_results:
result.total_calls = sum(result.n_calls.values())
result.total_calls += sum(result.n_calls_linearize.values())
[docs] def set_cost_function(self, formulation, cost):
"""Set cost function for each formulation.
:param str formulation: name of the formulation.
:param function cost: cost function
"""
self.cost_function[formulation] = cost
[docs] def set_cost_unit(self, cost_unit):
"""Set the measurement unit for cost evaluation.
:param str cost_unit: cost unit, e.g. 'h', 'min', ...
"""
self.unit_cost = cost_unit
desc = self.descriptions["original_exec_time"]
desc = desc.split(" (")[0]
desc += " (" + cost_unit + ")"
self.descriptions["original_exec_time"] = desc
[docs] def labelize_exec_time(self, description):
"""Change the description of execution time,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("exec_time", description)
[docs] def labelize_original_exec_time(self, description):
"""Change the description of original execution time,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("original_exec_time", description)
if self.unit_cost is not None:
self.set_cost_unit(self.unit_cost)
[docs] def labelize_n_calls(self, description):
"""Change the description of number of calls,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("n_calls", description)
[docs] def labelize_n_calls_linearize(self, description):
"""Change the description of number of calls for linearization,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("n_calls_linearize", description)
[docs] def labelize_status(self, description):
"""Change the description of status,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("status", description)
[docs] def labelize_is_feasible(self, description):
"""Change the description of feasibility,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("is_feasible", description)
[docs] def labelize_scaling_strategy(self, description):
"""Change the description of scaling strategy,
used in plotting methods notably.
:param str description: description.
"""
self._update_descriptions("scaling_strategy", description)
def _update_descriptions(self, keyword, description):
"""Update the description initialized
with the NOMENCLATURE class attribute.
:param str keyword: keyword of the considered object.
:param str description: new description
"""
if not self.descriptions.get(keyword):
raise ValueError(
"The keyword " + keyword + " is not "
"in the list: " + ", ".join(list(self.descriptions.keys()))
)
if not isinstance(description, basestring):
raise TypeError(
'The argument "description" must be '
"of type string, "
"not of type " + description.__class__.__name__
)
self.descriptions[keyword] = description
def __load_results(self):
""" Load results from the results directory of the study path."""
if not os.path.isdir(self.study_directory):
raise ValueError('Directory "' + self.study_directory + '" does not exist.')
directory = os.path.join(self.study_directory, RESULTS_DIRECTORY)
if not os.path.isdir(directory):
raise ValueError('Directory "' + directory + '" does not exist.')
filenames = [
filename
for filename in os.listdir(directory)
if os.path.isfile(os.path.join(directory, filename))
]
results = []
for filename in filenames:
name = filename.split(".")[0]
id_scaling = int(name.split("_")[-2])
id_sample = int(name.split("_")[-1])
result = ScalabilityResult(name, id_scaling, id_sample)
result.load(self.study_directory)
results.append(result)
if not results:
raise ValueError("Directory " + directory + " is empty.")
return results
[docs] def plot(
self,
legend_loc="upper left",
xticks=None,
xticks_labels=None,
xmargin=0.0,
**options
):
"""Plot the results using different methods
according to the presence or absence of replicate values.
:param str legend_loc: legend localization
:param list(float) xticks: list of xticks (default: None)
:param list(str) xticks_labels: list of xticks labels (default: None)
:param float xmargin: margin on left and right sides of the x-axis
:param options: options for the specialized plot methods
"""
LOGGER.info("Execute post-processing")
self.__has_scaling_dimension(xticks)
self.__has_scaling_dimension(xticks_labels)
there_are_replicates = len(self.get_replicates(True)) > 1
if self.cost_function:
self._estimate_original_time()
if there_are_replicates:
LOGGER.info("| Type: replicate")
self._replicate_plot(legend_loc, xticks, xticks_labels, xmargin, **options)
else:
LOGGER.info("| Type: standard")
self._standard_plot(legend_loc, xticks, xticks_labels, xmargin)
def __has_scaling_dimension(self, value):
"""Assert if a value has the scaling dimension.
:param ndarray value: value.
"""
if value is not None and hasattr(value, "__len__"):
assert len(value) == len(self.get_scaling_strategies(True))
def _standard_plot(
self, legend_loc="upper left", xticks=None, xticks_labels=None, xmargin=0.0
):
"""Deterministic plot.
:param str legend_loc: legend localization
:param list(float) xticks: list of xticks (default: None)
:param list(str) xticks_labels: list of xticks labels (default: None)
:param float xmargin: margin on left and right sides of the x-axis
"""
colors = ["blue", "red", "green"]
idos = -1
for optim_strategy in self.get_optimization_strategies(True):
idos += 1
color = colors[idos]
xcoord, values = self.__get_values(optim_strategy)
for ylabel, ycoord in values.items():
if hasattr(ycoord, "shape"):
plt.figure(ylabel)
if len(ycoord.shape) == 3:
n_criteria = ycoord.shape[0]
else:
n_criteria = 1
linestyles = [
(0, (id_criterion + 1, n_criteria))
for id_criterion in range(n_criteria)
]
for criterion in range(n_criteria):
if len(ycoord.shape) == 3:
yycoord = ycoord[criterion, :, 0].flatten()
else:
yycoord = ycoord[:, 0].flatten()
yval = yycoord
xval = list(xcoord)
if xticks is not None:
assert hasattr(xticks, "__len__")
assert len(xticks) == len(xval)
xval = xticks
else:
xticks = xval
plt.plot(
xval,
yval,
"o",
linestyle=linestyles[criterion],
color=color,
)
if xticks_labels is None:
xticks_labels = xticks
else:
xticks_labels = xval
plt.xticks(xticks, xticks_labels)
plt.xlabel(self.descriptions["scaling_strategy"])
plt.ylabel(self.descriptions[ylabel])
plt.xlim(xval[0] - xmargin, xval[-1] + xmargin)
problem_legends = self.get_optimization_strategies(True)
legend_elements = [
Line2D([0], [0], color=colors[idx], lw=4, label=problem_legends[idx])
for idx in range(len(problem_legends))
]
for ylabel, ycoord in values.items():
if hasattr(ycoord, "shape"):
plt.figure(ylabel)
plt.legend(
handles=legend_elements,
loc=legend_loc,
frameon=False,
framealpha=0.5,
)
fpath = os.path.join(
self.study_directory, POSTSTUDY_DIRECTORY, ylabel + ".pdf"
)
plt.savefig(fpath)
@property
def n_samples(self):
""" Number of samples """
return len(self.get_replicates(True))
def _replicate_plot(
self,
legend_loc="upper left",
xticks=None,
xticks_labels=None,
xmargin=0.0,
minbox=2,
notch=False,
widths=0.25,
whis=1.5,
):
"""Probabilistic plot.
:param str legend_loc: legend localization
:param list(float) xticks: list of xticks (default: None)
:param list(str) xticks_labels: list of xticks labels (default: None)
:param float xmargin: margin on left and right sides of the x-axis
:param int minbox: minimal number of values for boxplot (default: 2).
:param bool notch: if True, will produced a notched boxplot.
:param float whis: the reach of the whiskers to the beyond
the first and third quartiles (default: 1.5).
"""
if not hasattr(widths, "__len__"):
widths = [widths] * len(self.get_scaling_strategies(True))
else:
self.__has_scaling_dimension(widths)
n_samples = len(self.get_replicates(True))
colors = ["blue", "red", "green"]
if n_samples >= minbox:
idos = -1
n_strategies = len(self.get_optimization_strategies(True))
for optim_strategy in self.get_optimization_strategies(True):
idos += 1
xcoord, values = self.__get_values(optim_strategy)
for ylabel, ycoord in values.items():
if hasattr(ycoord, "shape") and ylabel != "status":
plt.figure(ylabel)
xval = list(xcoord)
if xticks is not None:
assert hasattr(xticks, "__len__")
assert len(xticks) == len(xval)
xval = xticks
tmp = []
for idx, xtick in enumerate(xval):
tmp.append(float(idos) / n_strategies)
tmp[-1] *= widths[idx] * 3
tmp[-1] += xtick
self.__draw_boxplot(
ycoord, tmp, colors[idos], colors[idos], notch, widths, whis
)
if len(ycoord.shape) == 3:
zval = ycoord[0, :, :]
else:
zval = ycoord
xval_offset = [
xtick + float(idos) / n_strategies * widths[idx] * 3
for idx, xtick in enumerate(xval)
]
plt.plot(xval_offset, median(zval, 1), "--", color=colors[idos])
if xticks_labels is None:
xticks_labels = xval
plt.xticks(xval, xticks_labels)
plt.xlim(xval[0] - xmargin, xval[-1] + xmargin)
plt.xlabel(self.descriptions["scaling_strategy"])
plt.ylabel(self.descriptions[ylabel])
plt.yscale("log")
plt.grid(True, "both")
problem_legends = self.get_optimization_strategies(True)
legend_elements = [
Line2D([0], [0], color=colors[idx], lw=4, label=problem_legends[idx])
for idx in range(len(problem_legends))
]
for ylabel, ycoord in values.items():
if hasattr(ycoord, "shape") and ylabel != "status":
plt.figure(ylabel)
plt.legend(
handles=legend_elements,
loc=legend_loc,
frameon=False,
framealpha=0.5,
)
fpath = os.path.join(
self.study_directory, POSTSTUDY_DIRECTORY, ylabel + ".pdf"
)
plt.savefig(fpath)
@staticmethod
def __draw_boxplot(data, xticks, edge_color, fill_color, notch, widths, whis):
"""Draw boxplot from a dataset.
:param array data: dataset array of dimension 2 or 3
:param list(float) xticks: values of xticks
:param str edge_color: edge color
:param str fill_color: fill color
:param bool notch: if True, will produced a notched boxplot.
:param list(float) widths: widths of boxplots
:param float whis: the reach of the whiskers to the beyond
the first and third quartiles
"""
if len(data.shape) == 3:
data = data[0, :, :]
boxplot = plt.boxplot(
data.T,
patch_artist=True,
positions=xticks,
showfliers=False,
whiskerprops={"linestyle": "-"},
notch=notch,
widths=widths,
whis=whis,
)
for element in ["boxes", "whiskers", "fliers", "means", "caps"]:
plt.setp(boxplot[element], color=edge_color)
plt.setp(boxplot["medians"], color="white")
for patch in boxplot["boxes"]:
patch.set(facecolor=fill_color)
return boxplot
def __get_values(self, optim_strategy):
"""Get values of criteria and corresponding scaling levels
for a given optimization strategy and all replicates.
:param str optim_strategy: name of the optimization strategy.
:return: scaling levels, criteria values
:rtype: list(int), dict(array)
"""
exec_time = []
original_exec_time = []
n_calls = []
n_calls_linearize = []
status = []
is_feasible = []
scaling = []
total_calls = []
for replicate in self.get_replicates(True):
xcoord, ycoord = self.__get_replicate_values(optim_strategy, replicate)
exec_time.append(ycoord["exec_time"])
total_calls.append(ycoord["total_calls"])
if self.cost_function:
original_exec_time.append(ycoord["original_exec_time"])
tmp = [
[val for _, val in ycoord["n_calls"][idx].items()]
for idx in range(len(ycoord["n_calls"]))
]
n_calls.append(tmp)
tmp = [
[val for _, val in ycoord["n_calls_linearize"][idx].items()]
for idx in range(len(ycoord["n_calls_linearize"]))
]
n_calls_linearize.append(tmp)
status.append(ycoord["status"])
is_feasible.append(ycoord["is_feasible"])
scaling.append(ycoord["scaling"])
scaling_levels = xcoord
values = {}
values["exec_time"] = array(exec_time).T
if self.cost_function:
values["original_exec_time"] = array(original_exec_time).T
values["n_calls"] = array(n_calls).T
values["n_calls_linearize"] = array(n_calls_linearize).T
values["total_calls"] = array(total_calls).T
values["status"] = array(status).T
values["is_feasible"] = array(is_feasible).T
values["scaling"] = scaling
return scaling_levels, values
def __get_replicate_values(self, optim_strategy, replicate):
"""Get values of criteria and corresponding scaling levels
for a given optimization strategy and a given replicate.
:param str optim_strategy: optimization strategy.
:param int replicate: replicate index.
:return: scaling levels, criteria values
:rtype: list(int), dict(array-like)
"""
are_replicate = [value == replicate for value in self.get_replicates()]
are_optim_strategy = [
value == optim_strategy for value in self.get_optimization_strategies()
]
are_ok = [
is_rep and is_oo for is_rep, is_oo in zip(are_replicate, are_optim_strategy)
]
indices = [index for index, is_ok in enumerate(are_ok) if is_ok]
scaling_levels = self.get_scaling_strategies()
scaling_levels = [scaling_levels[index] for index in indices]
tmp = sorted(list(range(len(scaling_levels))), key=lambda k: scaling_levels[k])
indices = [indices[index] for index in tmp]
scaling_levels = [scaling_levels[index] for index in tmp]
results = [self.scalability_results[index] for index in indices]
exec_time = [result.exec_time for result in results]
total_calls = [result.total_calls for result in results]
original_exec_time = [result.original_exec_time for result in results]
n_calls = [result.n_calls for result in results]
n_calls_linearize = [result.n_calls_linearize for result in results]
status = [result.status for result in results]
is_feasible = [result.is_feasible for result in results]
scaling = [result.scaling for result in results]
values = {}
values["exec_time"] = exec_time
values["total_calls"] = total_calls
if self.cost_function:
values["original_exec_time"] = original_exec_time
values["n_calls"] = n_calls
values["n_calls_linearize"] = n_calls_linearize
values["status"] = status
values["is_feasible"] = is_feasible
values["scaling"] = scaling
return scaling_levels, values
@property
def names(self):
""" Get the names of the scalability results. """
return [value.name for value in self.scalability_results]
[docs] def get_optimization_strategies(self, unique=False):
"""Get the names of the optimization strategies.
:param bool unique: return either unique values if True
or one value per scalability result if False (default: False).
:return: names of the optimization stategies.
:rtype: list(str)
"""
os_list = ["_".join(name.split("_")[0:-2]) for name in self.names]
if unique:
os_list = sorted(set(os_list))
return os_list
[docs] def get_scaling_strategies(self, unique=False):
"""Get the identificants of the scaling strategies.
:param bool unique: return either unique values if True
or one value per scalability result if False (default: False).
:return: identifiants of scaling strategies
:rtype: list(int)
"""
ss_list = [int(name.split("_")[-2]) for name in self.names]
if unique:
ss_list = sorted(set(ss_list))
return ss_list
[docs] def get_replicates(self, unique=False):
"""Get the replicate identifiants.
:param bool unique: return either unique values if True
or one value per scalability result if False (default: False).
:return: identifiants of replicates.
:rtype: list(int)
"""
rep = [int(name.split("_")[-1]) for name in self.names]
if unique:
rep = sorted(set(rep))
return rep
def _estimate_original_time(self):
"""Estimate the original execution time from the number of calls
and linearizations of the different disciplines and
top-level disciplines and from the cost functions provided by the
user.
:return: original time
:rtype: float
"""
for scalability_result in self.scalability_results:
n_c = scalability_result.n_calls
n_cl = scalability_result.n_calls_linearize
n_tl_c = scalability_result.n_calls_top_level
n_tl_cl = scalability_result.n_calls_linearize_top_level
varsizes = scalability_result.new_varsizes
formulation = scalability_result.formulation
if formulation not in self.cost_function:
raise ValueError(
"The cost function of "
+ formulation
+ " must be defined in order to compute "
" the estimated original time."
)
result = self.cost_function[formulation](
varsizes, n_c, n_cl, n_tl_c, n_tl_cl
)
scalability_result.original_exec_time = result