Source code for gemseo.utils.derivatives_approx

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#       :author : Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
Finite differences & complex step approximations
************************************************
"""
from __future__ import division, unicode_literals

import logging
from itertools import chain
from multiprocessing import cpu_count

from matplotlib import pyplot
from numpy import (
    absolute,
    allclose,
    amax,
    arange,
    argmax,
    array,
    atleast_2d,
    complex128,
    concatenate,
    divide,
    finfo,
    float64,
    ndarray,
    ones,
    tile,
    where,
    zeros,
)
from numpy.linalg import norm

from gemseo.core.parallel_execution import ParallelExecution
from gemseo.utils.data_conversion import DataConversion
from gemseo.utils.py23_compat import xrange

EPSILON = finfo(float).eps
LOGGER = logging.getLogger(__name__)


[docs]class ComplexStep(object): """Complex step, second order gradient calculation. Enables a much lower step than real finite differences, typically fd_step=1e-30 since there is no cancellation error due to a difference calculation. grad = Imaginary part(f(x+j*fd_step)/(fd_step)) """ def __init__(self, f_pointer, step=1e-20, parallel=False, **parallel_args): """Constructor. :param f_pointer: pointer on function to derive :param step: differentiation step :param parallel: if True, executes in parallel :param parallel_args: arguments passed to the parallel execution, see gemseo.core.parallel_execution """ self.f_pointer = f_pointer self.__par_args = parallel_args self.__parallel = parallel if step.imag != 0: self.step = step.imag else: self.step = step
[docs] def f_gradient(self, x_vect, step=None, **kwargs): """Compute gradient by complex step. :param x_vect: design vector :type x_vect: numpy array :param kwargs: optional arguments for the function :returns: function gradient :rtype: numpy array """ if norm(x_vect.imag) != 0.0: raise ValueError( "Impossible to check the gradient at a complex " + "point using the complex step method !" ) n_dim = len(x_vect) grad = [] if step is None: step = self.step x_p_arr = self.generate_perturbations(n_dim, x_vect, step) if self.__parallel: def func_noargs(xval): """Function calling the f_pointer without explicitly passed arguments.""" return self.f_pointer(xval, **kwargs) function_list = [func_noargs] * n_dim parallel_execution = ParallelExecution(function_list, **self.__par_args) all_x = [x_vect + x_p_arr[:, i] for i in xrange(n_dim)] output_list = parallel_execution.execute(all_x) for i in xrange(n_dim): f_p = output_list[i] grad.append(f_p.imag / x_p_arr[i, i].imag) else: for i in xrange(n_dim): x_p = x_vect + x_p_arr[:, i] f_p = self.f_pointer(x_p, **kwargs) grad.append(f_p.imag / x_p_arr[i, i].imag) return array(grad, dtype=float64).T
[docs] def generate_perturbations(self, n_dim, x_vect, step=None): """Generates the perturbations x_perturb which will be used to compute f(x_vect+x_perturb) :param n_dim: dimension :type n_dim: integer :param x_vect: design vector :type x_vect: numpy array :returns: perturbations :rtype: numpy array """ if step is None: step = self.step x_perturb = zeros((n_dim, n_dim), dtype=complex128) x_nnz = where(x_vect == 0.0, 1.0, x_vect) x_perturb[range(n_dim), range(n_dim)] = 1j * x_nnz * step return x_perturb
[docs]class FirstOrderFD(object): """Finites differences at first order, first order gradient calculation. grad =(f(x+fd_step)-f(x))/fd_step """ def __init__(self, f_pointer, step=1e-6, parallel=False, **parallel_args): """Constructor. :param f_pointer: pointer on function to derive :param step: differentiation step :param parallel: if True, executes in parallel :param parallel_args: arguments passed to the parallel execution, see gemseo.core.parallel_execution """ self.f_pointer = f_pointer self.step = step self.__par_args = parallel_args self.__parallel = parallel
[docs] def f_gradient(self, x_vect, step=None, **kwargs): """Compute gradient by real step. :param x_vect: design vector :type x_vect: numpy array :param kwargs: additional arguments passed to the function :returns: function gradient :rtype: numpy array """ n_dim = len(x_vect) x_p_arr = self.generate_perturbations(n_dim, x_vect, step) grad = [] if step is None: step = self.step if not isinstance(step, ndarray): step = step * ones(n_dim) if self.__parallel: def func_noargs(xval): """Function calling the f_pointer without explicitly passed arguments.""" return self.f_pointer(xval, **kwargs) function_list = [func_noargs] * (n_dim + 1) parallel_execution = ParallelExecution(function_list, **self.__par_args) all_x = [x_vect] + [x_p_arr[:, i] for i in xrange(n_dim)] output_list = parallel_execution.execute(all_x) f_0 = output_list[0] for i in xrange(n_dim): f_p = output_list[i + 1] g_approx = ((f_p - f_0) / step[i]).real grad.append(g_approx) else: f_0 = self.f_pointer(x_vect, **kwargs) for i in xrange(n_dim): f_p = self.f_pointer(x_p_arr[:, i], **kwargs) g_approx = ((f_p - f_0) / step[i]).real grad.append(g_approx) return array(grad, dtype=float64).T
def _get_opt_step(self, f_p, f_0, f_m, numerical_error=EPSILON): """Compute the optimal step, f may be a vector function In this case, take the worst case. :param f_0: f(x) :param f_m: f(x-e) :param f_p: f(x+e) :param numerical_error: numerical error associated to the calculation of f. By default Machine epsilon (appx 1e-16), but can be higher when the calculation of f requires a numerical resolution """ n_out = f_p.size if n_out == 1: t_e, c_e, opt_step = comp_best_step( f_p, f_0, f_m, self.step, epsilon_mach=numerical_error ) if t_e is None: error = 0.0 else: error = t_e + c_e else: errors = zeros(n_out) opt_steps = zeros(n_out) for i in xrange(n_out): t_e, c_e, opt_steps[i] = comp_best_step( f_p[i], f_0[i], f_m[i], self.step, epsilon_mach=numerical_error ) if t_e is None: errors[i] = 0.0 else: errors[i] = t_e + c_e max_i = argmax(errors) error = errors[max_i] opt_step = opt_steps[max_i] return error, opt_step
[docs] def compute_optimal_step(self, x_vect, numerical_error=EPSILON, **kwargs): """Compute gradient by real step. :param x_vect: design vector :type x_vect: numpy array :param kwargs: additional arguments passed to the function :param numerical_error: numerical error associated to the calculation of f. By default Machine epsilon$ (appx 1e-16), but can be higher when the calculation of f requires a numerical resolution :returns: function gradient :rtype: numpy array """ n_dim = len(x_vect) x_p_arr = self.generate_perturbations(n_dim, x_vect) x_m_arr = self.generate_perturbations(n_dim, x_vect, -self.step) opt_steps = self.step * ones(n_dim) errors = zeros(n_dim) comp_step = self._get_opt_step if self.__parallel: def func_noargs(xval): """Function calling the f_pointer without explicitly passed arguments.""" return self.f_pointer(xval, **kwargs) function_list = [func_noargs] * (n_dim + 1) parallel_execution = ParallelExecution(function_list, **self.__par_args) all_x = [x_vect] + [x_p_arr[:, i] for i in xrange(n_dim)] all_x += [x_m_arr[:, i] for i in xrange(n_dim)] output_list = parallel_execution.execute(all_x) f_0 = output_list[0] for i in xrange(n_dim): f_p = output_list[i + 1] f_m = output_list[n_dim + i + 1] errs, opt_step = comp_step( f_p, f_0, f_m, numerical_error=numerical_error ) errors[i] = errs opt_steps[i] = opt_step else: f_0 = self.f_pointer(x_vect, **kwargs) for i in xrange(n_dim): f_p = self.f_pointer(x_p_arr[:, i], **kwargs) f_m = self.f_pointer(x_m_arr[:, i], **kwargs) errs, opt_step = comp_step( f_p, f_0, f_m, numerical_error=numerical_error ) errors[i] = errs opt_steps[i] = opt_step self.step = opt_steps return opt_steps, errors
[docs] def generate_perturbations(self, n_dim, x_vect, step=None): """Generates the perturbations x_perturb which will be used to compute f(x_vect+x_perturb) Generates the perturbations x_perturb which will be used to compute f(x_vect+x_perturb) :param n_dim: dimension :type n_dim: integer :param x_vect: design vector :type x_vect: numpy array :param step: step for the finite differences :returns: perturbations x_perturb :rtype: numpy array """ if step is None: loc_step = self.step else: loc_step = step x_perturb = tile(x_vect, n_dim).reshape((n_dim, n_dim)).T x_perturb[xrange(n_dim), xrange(n_dim)] += loc_step return x_perturb
[docs]class DisciplineJacApprox(object): """Approximates a discipline Jacobian using finite differences or Complex step.""" COMPLEX_STEP = "complex_step" FINITE_DIFFERENCES = "finite_differences" N_CPUS = cpu_count() def __init__( self, discipline, approx_method=FINITE_DIFFERENCES, step=1e-7, parallel=False, n_processes=N_CPUS, use_threading=False, wait_time_between_fork=0, ): """ Constructor: :param discipline: the discipline for which the jacobian approximation shall be made :param approx_method: "complex_step" or "finite_differences" :param step: the step for finite differences or complex step :param parallel: if True, executes in parallel :param n_processes: maximum number of processors on which to run :param use_threading: if True, use Threads instead of processes to parallelize the execution multiprocessing will copy (serialize) all the disciplines, while threading will share all the memory This is important to note if you want to execute the same discipline multiple times, you shall use multiprocessing :param wait_time_between_fork: time waited between two forks of the process /Thread """ from gemseo.core.function import MDOFunctionGenerator self.discipline = discipline self.approx_method = approx_method self.step = step self.generator = MDOFunctionGenerator(discipline) self.func = None self.approximator = None self.auto_steps = {} self.__par_args = { "n_processes": n_processes, "use_threading": use_threading, "wait_time_between_fork": wait_time_between_fork, } self.__parallel = parallel def _create_approximator(self, outputs, inputs): """Creates the approximation class for the function jacobian. :param inputs: derive outputs wrt inputs :param outputs: outputs to be derived """ self.func = self.generator.get_function( input_names_list=inputs, output_names_list=outputs ) if self.approx_method == self.FINITE_DIFFERENCES: self.approximator = FirstOrderFD( self.func, self.step, self.__parallel, **self.__par_args ) elif self.approx_method == self.COMPLEX_STEP: self.approximator = ComplexStep( self.func, self.step, self.__parallel, **self.__par_args ) else: raise Exception( "Unknown jacobian approximation method " + str(self.approx_method) )
[docs] def auto_set_step( self, outputs, inputs, print_errors=True, numerical_error=EPSILON ): """Compute optimal step for a forward first order finite differences gradient approximation Requires a first evaluation of perturbed functions values. The optimal step is reached when the truncation error (cut in the Taylor development), and the numerical cancellation errors (roundoff when doing f(x+step)-f(x)) are equal. :param outputs: the list of outputs to compute the derivative :param inputs: this list of outputs to derive wrt :param print_errors: if True logs the cancellation and truncation error estimates :param numerical_error: numerical error associated to the calculation of f. By default Machine epsilon (appx 1e-16), but can be higher when the calculation of f requires a numerical resolution :returns: function gradient See: https://en.wikipedia.org/wiki/Numerical_differentiation and "Numerical Algorithms and Digital Representation", Knut Morken , Chapter 11, "Numerical Differenciation" """ self._create_approximator(outputs, inputs) old_cache_tol = self.discipline.cache_tol self.discipline.cache_tol = 0.0 x_vect = self._prepare_xvect(inputs, self.discipline.default_inputs) compute_opt_step = self.approximator.compute_optimal_step steps_opt, errors = compute_opt_step(x_vect, numerical_error=numerical_error) if print_errors: LOGGER.info( "Set optimal step for finite differences. Estimated" " approximation errors =" ) LOGGER.info(errors) self.discipline.cache_tol = old_cache_tol data = self.discipline.local_data data_sizes = {key: val.size for key, val in data.items()} self.auto_steps = DataConversion.array_to_dict(steps_opt, inputs, data_sizes) return errors, self.auto_steps
def _prepare_xvect(self, inputs, data=None): """ :param inputs: derive outputs wrt inputs """ if data is None: data = self.discipline.local_data x_vect = DataConversion.dict_to_array(data, inputs) return x_vect
[docs] def compute_approx_jac(self, outputs, inputs): """Computes the approximation. :param inputs: derive outputs wrt inputs :param outputs: outputs to be derived """ self._create_approximator(outputs, inputs) old_cache_tol = self.discipline.cache_tol self.discipline.cache_tol = 0.0 local_data = self.discipline.local_data x_vect = self._prepare_xvect(inputs) if ( self.auto_steps is not None and array([key in self.auto_steps for key in inputs]).all() ): step = concatenate([self.auto_steps[key] for key in inputs]) else: step = self.step if hasattr(step, "len") and 1 < len(step) != len(x_vect): raise ValueError( "Inconsistent step size, expected " + str(len(x_vect)) + " got " + str(len(step)) ) flat_jac = self.approximator.f_gradient(x_vect, step) flat_jac = atleast_2d(flat_jac) data_sizes = {key: len(local_data[key]) for key in chain(inputs, outputs)} self.discipline.cache_tol = old_cache_tol return DataConversion.jac_2dmat_to_dict(flat_jac, outputs, inputs, data_sizes)
[docs] def check_jacobian( self, analytic_jacobian, outputs, inputs, discipline, threshold=1e-8, plot_result=False, file_path="jacobian_errors.pdf", show=False, figsize_x=10, figsize_y=10, ): """Checks if the jacobian provided by the linearize() method is correct. :param analytic_jacobian: jacobian to validate :param inputs: list of inputs wrt which to differentiate :param outputs: list of outputs to differentiate :param threshold: acceptance threshold for the jacobian error (Default value = 1e-8) :param inputs: list of inputs wrt which to differentiate (Default value = None) :param plot_result: plot the result of the validation (computed and approximate jacobians) :param file_path: path to the output file if plot_result is True :param show: if True, open the figure :param figsize_x: x size of the figure in inches :param figsize_y: y size of the figure in inches :returns: True if the check is accepted, False otherwise """ approx_jac_complete = self.compute_approx_jac(outputs, inputs) name = discipline.name succeed = True for out_data, apprx_jac_dict in approx_jac_complete.items(): for in_data, approx_jac in apprx_jac_dict.items(): computed_jac = analytic_jacobian[out_data][in_data] if approx_jac.shape != computed_jac.shape: succeed = False msg = name + " Jacobian: dp " + str(out_data) + "/dp " msg += str(in_data) + " is of wrong shape ! " msg += "Got:" + str(computed_jac.shape) msg += " while expected: " + str(approx_jac.shape) LOGGER.error(msg) else: success_loc = allclose( computed_jac, approx_jac, atol=threshold, rtol=threshold ) if not success_loc: err = amax( divide( absolute(computed_jac - approx_jac), absolute(approx_jac) + 1.0, ) ) msg = name + " Jacobian: dp " + str(out_data) msg += "/d " + str(in_data) + " is wrong by " msg += str(err * 100.0) + "%" LOGGER.error(msg) msg = "Approximate jacobian = \n" + str(approx_jac) LOGGER.info(msg) msg = "Provided by linearize method = \n" msg += str(computed_jac) LOGGER.info(msg) msg = "Difference of jacobians = \n" msg += str(approx_jac - computed_jac) LOGGER.info(msg) succeed = succeed and success_loc else: LOGGER.info( "Jacobian: dp %s/dp %s succeeded!", str(out_data), str(in_data), ) if succeed: LOGGER.info("Linearization of MDODiscipline: %s" " is correct !", str(name)) else: LOGGER.info("Linearization of MDODiscipline: %s" " is wrong !", str(name)) if plot_result: self.plot_jac_errors( analytic_jacobian, approx_jac_complete, file_path, show, figsize_x, figsize_y, ) return succeed
@staticmethod def __format_jac_as_grad_dict(computed_jac, approx_jac): """Formats the approximate jacobian dict as a dict of gradients. :param computed_jac: reference computed jac dict of dicts :param approx_jac: dict of gradients :returns grad dict, approx dict, and design var names """ approx_grad_dict = {} computed_grad_dict = {} in_names = None for out_data, apprx_jac_dict in approx_jac.items(): com_jac_dict = computed_jac[out_data] approx_grad = [] computed_grad = [] if in_names is None: in_names = list(iter(computed_jac[out_data].keys())) x_names = [ inp + "_" + str(i + 1) for inp in in_names for i in xrange(apprx_jac_dict[inp].shape[1]) ] for in_data in in_names: approx_grad.append(apprx_jac_dict[in_data]) computed_grad.append(com_jac_dict[in_data]) approx_grad = concatenate(approx_grad, axis=1) computed_grad = concatenate(computed_grad, axis=1) n_f, _ = approx_grad.shape if n_f == 1: approx_grad_dict[out_data] = approx_grad.flatten() computed_grad_dict[out_data] = computed_grad.flatten() else: for i in xrange(n_f): out_name = out_data + "_" + str(i) approx_grad_dict[out_name] = approx_grad[i, :] computed_grad_dict[out_name] = computed_grad[i, :] return computed_grad_dict, approx_grad_dict, x_names
[docs] def plot_jac_errors( self, computed_jac, approx_jac, file_path="jacobian_errors.pdf", show=False, figsize_x=10, figsize_y=10, ): """Generate a plot of the exact vs approximate jacobian. :param computed_jac: computed jacobianfrom linearize method :param approx_jac: finite differences approximate jacobian :param file_path: path to the output file if plot_result is True :param show: if True, open the figure :param figsize_x: x size of the figure in inches :param figsize_y: y size of the figure in inches """ comp_grad, app_grad, x_labels = self.__format_jac_as_grad_dict( computed_jac, approx_jac ) n_funcs = len(app_grad) if n_funcs == 0: raise ValueError("No gradients to plot!") nrows = n_funcs // 2 if 2 * nrows < n_funcs: nrows += 1 ncols = 2 fig, axes = pyplot.subplots( nrows=nrows, ncols=2, sharex=True, sharey=False, figsize=(figsize_x, figsize_y), ) i = 0 j = -1 axes = atleast_2d(axes) n_subplots = len(axes) * len(axes[0]) abscissa = arange(len(x_labels)) for func, grad in sorted(comp_grad.items()): j += 1 if j == ncols: j = 0 i += 1 axe = axes[i][j] axe.plot(abscissa, grad, "bo") axe.plot(abscissa, app_grad[func], "ro") axe.set_title(func) axe.set_xticklabels(x_labels, fontsize=14) axe.set_xticks(abscissa) for tick in axe.get_xticklabels(): tick.set_rotation(90) # Update y labels spacing vis_labels = [ label for label in axe.get_yticklabels() if label.get_visible() is True ] pyplot.setp(vis_labels[::2], visible=False) # pyplot.xticks(rotation=90) if len(comp_grad.items()) < n_subplots: # xlabel must be written with the same fontsize on the 2 columns j += 1 axe = axes[i][j] axe.set_xticklabels(x_labels, fontsize=14) axe.set_xticks(abscissa) for tick in axe.get_xticklabels(): tick.set_rotation(90) fig.suptitle( "Computed and approximate derivatives. " + " blue = computed, red = approximated derivatives", fontsize=14, ) if file_path is not None: pyplot.savefig(file_path) if show: pyplot.show() return fig
[docs]def comp_best_step(f_p, f_x, f_m, step, epsilon_mach=EPSILON): """Compute optimal step for a forward first order finite differences gradient approximation Requires a first evaluation of perturbed functions values. The optimal step is reached when the truncation error (cut in the Taylor development), and the numerical cancellation errors (roundoff when doing f(x+step)-f(x)) are equal. See: https://en.wikipedia.org/wiki/Numerical_differentiation and "Numerical Algorithms and Digital Representation", Knut Morken , Chapter 11, "Numerical Differenciation" :param f_p: f(x+step) :param f_x: f(x) :param f_m: f(x-step) :param step: step used for the calculations of perturbed functions values :returns: trunc_error, cancel_error, optimal step """ hess = approx_hess(f_p, f_x, f_m, step) if abs(hess) < 1e-10: LOGGER.debug("Hessian approximation is too small, can't compute optimal step !") return None, None, step opt_step = 2 * (epsilon_mach * abs(f_x) / abs(hess)) ** 0.5 trunc_error = compute_truncature_error(hess, step) cancel_error = compute_cancellation_error(f_x, opt_step) return trunc_error, cancel_error, opt_step
[docs]def compute_truncature_error(hess, step): """Computes the truncation error estimation for a first order finite differences scheme. :param hess: second order derivative (d²f/dx²) :param step: step of the finite differences used for the derivatives approximation :returns: trunc_error the trucation error """ trunc_error = abs(hess) * step / 2 return trunc_error
[docs]def compute_cancellation_error(f_x, step, epsilon_mach=EPSILON): """Compute the cancellation error, ie roundoff when doing f(x+step)-f(x) :param f_x: value of the function at current point :param step: step used for the calculations of perturbed functions values :param epsilon_mach: machine epsilon :returns: the cancellation error """ epsa = epsilon_mach * abs(f_x) cancel_error = 2 * epsa / step return cancel_error
[docs]def approx_hess(f_p, f_x, f_m, step): """Second order approximation of the hessian (d²f/dx²) :param f_p: f(x+step) :param f_x: f(x) :param f_m: f(x-step) :param step: step used for the calculations of perturbed functions values :returns: hessian approximation """ hess = (f_p - 2 * f_x + f_m) / (step ** 2) return hess