Source code for gemseo.algos.opt.augmented_lagrangian.base_augmented_lagrangian

# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
"""An implementation of the augmented lagrangian algorithm."""

from __future__ import annotations

import logging
from abc import abstractmethod
from copy import deepcopy
from typing import TYPE_CHECKING
from typing import Any
from typing import Final

from numpy import atleast_1d
from numpy import concatenate
from numpy import inf
from numpy import zeros_like
from numpy.linalg import norm
from numpy.ma import allequal

from gemseo.algos.aggregation.aggregation_func import aggregate_positive_sum_square
from gemseo.algos.aggregation.aggregation_func import aggregate_sum_square
from gemseo.algos.opt.base_optimization_library import BaseOptimizationLibrary
from gemseo.algos.opt.factory import OptimizationLibraryFactory
from gemseo.algos.optimization_problem import OptimizationProblem

if TYPE_CHECKING:
    from collections.abc import Iterable

    from gemseo.algos.optimization_result import OptimizationResult
    from gemseo.core.mdo_functions.mdo_function import MDOFunction
    from gemseo.typing import NumberArray
    from gemseo.typing import StrKeyMapping

LOGGER = logging.getLogger(__name__)


[docs] class BaseAugmentedLagrangian(BaseOptimizationLibrary): """This is an abstract base class for augmented lagrangian optimization algorithms. The abstract methods :func:`_update_penalty` and :func:`_update_lagrange_multipliers` need to be implemented by derived classes. """ __n_obj_func_calls: int """The total number of objective function calls.""" __INITIAL_RHO: Final[str] = "initial_rho" """The name of the option for `initial_rho` parameter.""" __SUB_PROBLEM_CONSTRAINTS: Final[str] = "sub_problem_constraints" """The name of the option that corresponds to sub problem constraints.""" _rho: float """The penalty value.""" _function_outputs: dict[str, float | NumberArray] """The current iteration function outputs.""" _sub_problems: list[OptimizationProblem] """The sub problems appended in the sequence of optimization problem.""" def __init__(self, algo_name: str) -> None: # noqa:D107 super().__init__(algo_name) self.__n_obj_func_calls = 0 self._function_outputs = {} self._sub_problems = [] @property def n_obj_func_calls(self) -> int: """The total number of objective function calls.""" return self.__n_obj_func_calls def _run(self, problem: OptimizationProblem, **settings: Any) -> tuple[str, Any]: self._rho = settings[self.__INITIAL_RHO] self._update_options_callback = settings["update_options_callback"] problem_ineq_constraints = [ constr for constr in problem.constraints.get_inequality_constraints() if constr.name not in settings[self.__SUB_PROBLEM_CONSTRAINTS] ] problem_eq_constraints = [ constr for constr in problem.constraints.get_equality_constraints() if constr.name not in settings[self.__SUB_PROBLEM_CONSTRAINTS] ] current_value = self._problem.design_space.get_current_value( normalize=self._normalize_ds ) eq_multipliers = { h.name: zeros_like(h.evaluate(current_value)) for h in problem_eq_constraints } ineq_multipliers = { g.name: zeros_like(g.evaluate(current_value)) for g in problem_ineq_constraints } active_constraint_residual = inf x = self._problem.design_space.get_current_value() message = None for iteration in range(settings[self._MAX_ITER]): LOGGER.debug("iteration: %s", iteration) LOGGER.debug( "inequality Lagrange multiplier approximations: %s", ineq_multipliers ) LOGGER.debug( "equality Lagrange multiplier approximations: %s", eq_multipliers ) LOGGER.debug("Active constraint residual: %s", active_constraint_residual) LOGGER.debug("penalty: %s", self._rho) # Get the next design candidate solving the sub-problem. f_calls_sub_prob, x_new = self.__solve_sub_problem( eq_multipliers, ineq_multipliers, self._normalize_ds, settings[self.__SUB_PROBLEM_CONSTRAINTS], settings["sub_algorithm_name"], settings["sub_algorithm_settings"], x, ) self.__n_obj_func_calls += f_calls_sub_prob (_, hv, vk) = ( self.__compute_objective_function_and_active_constraint_residual( ineq_multipliers, problem_eq_constraints, problem_ineq_constraints, x_new, ) ) self._rho = self._update_penalty( constraint_violation_current_iteration=max(norm(vk), norm(hv)), objective_function_current_iteration=self._function_outputs[ self._problem.objective.name ], constraint_violation_previous_iteration=active_constraint_residual, current_penalty=self._rho, iteration=iteration, **settings, ) # Update the active constraint residual. active_constraint_residual = max(norm(vk), norm(hv)) self._update_lagrange_multipliers(eq_multipliers, ineq_multipliers, x_new) has_converged, message = self._check_termination_criteria( x_new, x, eq_multipliers, ineq_multipliers ) if has_converged: break x = x_new return message, None def _post_run( self, problem: OptimizationProblem, result: OptimizationResult, max_design_space_dimension_to_log: int, **settings: Any, ) -> None: result.n_obj_call = self.__n_obj_func_calls super()._post_run( problem, result, max_design_space_dimension_to_log, **settings ) @staticmethod def _check_termination_criteria( x_new: NumberArray, x: NumberArray, eq_lag: dict[str, NumberArray], ineq_lag: dict[str, NumberArray], ) -> tuple[bool, str]: """Check if the termination criteria are satisfied. Args: x_new: The new design vector. x: The old design vector. eq_lag: The equality constraint lagrangian multipliers. ineq_lag: The inequality constraint lagrangian multipliers. Returns: Whether the termination criteria are satisfied and the convergence message. """ if len(eq_lag) + len(ineq_lag) == 0: return True, "The sub solver dealt with the constraints." if allequal(x_new, x): return True, "The solver stopped proposing new designs." return False, "Maximun number of iterations reached." def __compute_objective_function_and_active_constraint_residual( self, mu0: dict[str, NumberArray], problem_eq_constraints: Iterable[MDOFunction], problem_ineq_constraints: Iterable[MDOFunction], x_opt: NumberArray, ) -> tuple[float | NumberArray, NumberArray | Iterable, NumberArray | Iterable]: """Compute the objective function and active constraint residuals. Args: mu0: The lagrangian multipliers for inequality constraints. problem_eq_constraints: The optimization problem equality constraints dealt with Augmented Lagrangian. problem_ineq_constraints: The optimization problem inequality constraints dealt with Augmented Lagrangian. x_opt: The current design variable vector. Returns: The objective function value, the equality constraint violation value, the active inequality constraint residuals. """ self._problem.design_space.set_current_value(x_opt) require_gradient = self.ALGORITHM_INFOS[self.algo_name].require_gradient output_functions, jacobian_functions = self._problem.get_functions( jacobian_names=() if require_gradient else None, ) self._function_outputs, _ = self._problem.evaluate_functions( output_functions=output_functions or None, jacobian_functions=jacobian_functions or None, ) f_opt = self._function_outputs[self._problem.objective.name] gv = [ atleast_1d(self._function_outputs[constr.name]) for constr in problem_ineq_constraints ] hv = [ atleast_1d(self._function_outputs[constr.name]) for constr in problem_eq_constraints ] mu_vector = [ atleast_1d(mu0[constr.name]) for constr in problem_ineq_constraints ] vk = [ -g_i * (-g_i <= mu / self._rho) + mu / self._rho * (-g_i > mu / self._rho) for g_i, mu in zip(gv, mu_vector) ] vk = concatenate(vk) if vk else vk hv = concatenate(hv) if hv else hv return f_opt, hv, vk @staticmethod def _check_for_preconditioner(sub_algorithm_settings: StrKeyMapping) -> None: """Check if 'precond' is in sub_algorithm_settings and log if detected.""" if sub_algorithm_settings and "precond" in sub_algorithm_settings: LOGGER.info("Preconditioner Detected") def __solve_sub_problem( self, lambda0: dict[str, NumberArray], mu0: dict[str, NumberArray], normalize: bool, sub_problem_constraints: Iterable[str], sub_algorithm_name: str, sub_algorithm_settings: StrKeyMapping, x_init: NumberArray, ) -> tuple[int, NumberArray]: """Solve the sub-problem. Args: lambda0: The lagrangian multipliers for equality constraints. mu0: The lagrangian multipliers for inequality constraints. normalize: Whether to normalize the design space. sub_problem_constraints: The constraints to keep in the sub-problem. If ``empty`` all constraints are dealt by the Augmented Lagrange, which means that the sub-problem is unconstrained. sub_algorithm_name: The name of the optimization algorithm used to solve each sub-poblem. sub_algorithm_settings: The settings of the sub-problem optimization solver. x_init: The design variable vector at the current iteration. Returns: The updated number of function call and the new design variable vector. """ # Get the sub problem. lagrangian = self.__get_lagrangian_function(lambda0, mu0, self._rho) dspace = deepcopy(self._problem.design_space) dspace.set_current_value(x_init) sub_problem = OptimizationProblem(dspace) sub_problem.objective = lagrangian for constraint in self._problem.constraints.get_originals(): if constraint.name in sub_problem_constraints: sub_problem.constraints.append(constraint) sub_problem.preprocess_functions(is_function_input_normalized=normalize) if self._update_options_callback is not None: self._update_options_callback(self._sub_problems, sub_algorithm_settings) self._check_for_preconditioner(sub_algorithm_settings) # Solve the sub-problem. opt = OptimizationLibraryFactory().execute( sub_problem, algo_name=sub_algorithm_name, **sub_algorithm_settings ) self._sub_problems.append(sub_problem) return sub_problem.objective.n_calls, opt.x_opt @abstractmethod def _update_penalty( self, constraint_violation_current_iteration: NumberArray | float, objective_function_current_iteration: NumberArray | float, constraint_violation_previous_iteration: NumberArray | float, current_penalty: NumberArray | float, iteration: int, **options: Any, ) -> float: """Update the penalty. This method must be implemented in a derived class in order to compute the penalty coefficient at each iteration of the Augmented Lagrangian algorithm. Args: objective_function_current_iteration: The objective function value at the current iteration. constraint_violation_current_iteration: The maximum constraint violation at the current iteration. constraint_violation_previous_iteration: The maximum constraint violation at the previous iteration. current_penalty: The penalty value at the current iteration. iteration: The iteration number. **options: The other options of the update penalty method. Returns: The updated penalty value. """ @abstractmethod def _update_lagrange_multipliers( self, eq_lag: dict[str, NumberArray], ineq_lag: dict[str, NumberArray], x_opt: NumberArray, ) -> None: """Update the lagrange multipliers. This method must be implemented in a derived class in order to compute the lagrange multipliers at each iteration of the Augmented Lagrangian algorithm. Args: eq_lag: The lagrange multipliers for equality constraints. ineq_lag: The lagrange multipliers for inequality constraints. x_opt: The current design variables vector. """ def __get_lagrangian_function( self, eq_lag: dict[str, NumberArray], ineq_lag: dict[str, NumberArray], rho: float, ) -> MDOFunction: """Return the lagrangian function. Args: eq_lag: The lagrangian multipliers for equality constraints. ineq_lag: The lagrangian multipliers for inequality constraints. rho: The penalty. Returns: The lagrangian function. """ lagrangian = self._problem.objective.original for constr in self._problem.constraints.get_originals(): if constr.name in ineq_lag: lagrangian += aggregate_positive_sum_square( constr + ineq_lag[constr.name] / rho, scale=rho / 2 ) if constr.name in eq_lag: lagrangian += aggregate_sum_square( constr + eq_lag[constr.name] / rho, scale=rho / 2 ) return lagrangian