Source code for gemseo.post.som

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#        :author: Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""Self Organizing Maps to display high dimensional design spaces."""

from __future__ import division, unicode_literals

import logging
from math import floor, sqrt

import matplotlib
from numpy import array, bincount, float64, int32, isnan, logical_not
from numpy import max as np_max
from numpy import mean, mgrid
from numpy import min as np_min
from numpy import ndarray, nonzero, unique, where, zeros
from pylab import plt

from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.post.core.colormaps import PARULA
from gemseo.post.opt_post_processor import OptPostProcessor
from gemseo.third_party import sompy

LOGGER = logging.getLogger(__name__)


[docs]class SOM(OptPostProcessor): """Self organizing map clustering optimization history. Options of the plot method are the x- and y- numbers of cells in the SOM. """ DEFAULT_FIG_SIZE = (12.0, 18.0) def __init__( self, opt_problem, # type: OptimizationProblem ): # type: (...) -> None super(SOM, self).__init__(opt_problem) self.som = None self.cmap = PARULA @staticmethod def __build_som_from_vars( x_vars, # type: ndarray som_grid_nx=5, # type:int som_grid_ny=5, # type:int initmethod="pca", # type:str verbose="off", # type:str ): # type: (...) -> SOM """Builds the SOM from the design variables history. Args: x_vars: The design variables history (n_iter,n_dv). som_grid_nx: The number of neurons in the x direction. som_grid_ny: The number of neurons in the y direction. initmethod: The initialization method for the SOM. verbose: The verbose for SOM training. Returns: The self organizing map """ LOGGER.info("Building Self Organizing Map from optimization history:") LOGGER.info(" Number of neurons in x direction = %s", str(som_grid_nx)) LOGGER.info(" Number of neurons in y direction = %s", str(som_grid_ny)) var_som = sompy.SOM( "som", x_vars, mapsize=[som_grid_ny + 1, som_grid_nx + 1], norm_method="var", initmethod=initmethod, ) var_som.init_map() var_som.train(n_job=1, shared_memory="no", verbose=verbose) return var_som def _plot( self, n_x=4, # type: int n_y=4, # type: int annotate=False, # type: bool ): # type: (...) -> None """ Args: n_x: The number of grids in x. n_y: The number of grids in y. annotate: If True, add label of neuron value to SOM plot. """ criteria_list = [ self.opt_problem.get_objective_name() ] + self.opt_problem.get_constraints_names() all_data = self.database.get_all_data_names() # Ensure that the data is available in the database for crit in criteria_list: if crit not in all_data: criteria_list.remove(crit) figure = plt.figure(figsize=self.DEFAULT_FIG_SIZE) figure.suptitle("Self Organizing Maps of the design space", fontsize=14) subplot_number = 0 self.__compute(n_x, n_y) for criteria in criteria_list: f_hist, _ = self.database.get_complete_history( ["SOM_i", "SOM_j", "SOM_indx", criteria] ) if isinstance(f_hist[0][3], ndarray): dim_val = f_hist[0][3].size for _ in range(dim_val): subplot_number += 1 else: subplot_number += 1 grid_size_x = 3 grid_size_y = subplot_number // grid_size_x if (subplot_number % grid_size_x) > 0: grid_size_y += 1 fig_indx = 1 for criteria in criteria_list: f_hist, _ = self.database.get_complete_history( ["SOM_i", "SOM_j", "SOM_indx", criteria] ) if isinstance(f_hist[0][3], ndarray): dim_val = f_hist[0][3].size for k in range(dim_val): f_hist_scalar = [] for f_h in f_hist: scal_list = f_h[0:3] scal_list.append(f_h[3][k]) f_hist_scalar.append(scal_list) criteria_name = criteria + "_" + str(k) self.__plot_som_from_scalar_data( f_hist_scalar, criteria_name, fig_indx, grid_size_x=grid_size_x, grid_size_y=grid_size_y, annotate=annotate, ) fig_indx += 1 else: self.__plot_som_from_scalar_data( f_hist, criteria, fig_indx, grid_size_x=grid_size_x, grid_size_y=grid_size_y, annotate=annotate, ) fig_indx += 1 self._add_figure(figure) def __plot_som_from_scalar_data( self, f_hist_scalar, # type: ndarray criteria, # type: str fig_indx, # type: int grid_size_x=3, # type: int grid_size_y=20, # type: int annotate=False, # type: bool ): """Builds the SOM plot after computation for a given criteria. Args: criteria: The criteria to show. f_hist_scalar: The scalar data to show. fig_indx: The axe index in the figure. grid_size_x: The number of SOMs in the grid on the x axis. grid_size_y: The number of SOMs in the grid on the y axis. annotate: If True, add label with average value of neural. """ f_hist = array(f_hist_scalar).T.real unique_ind = unique(f_hist[2, :]) average = {} for _, som_id in enumerate(unique_ind): where_somid = where(f_hist[2, :] == som_id)[0] ranges_of_uniques = f_hist[3, where_somid] average[som_id] = mean(ranges_of_uniques) ijshape = array((np_max(f_hist[0, :]), np_max(f_hist[1, :])), dtype=int32) mat_ij = zeros(ijshape, dtype=float64) mat_ij[:, :] = float("nan") for itr in range(f_hist.shape[-1]): i, j, somindx, _ = f_hist[:, itr] mat_ij[int(i) - 1, int(j) - 1] = average[somindx] empty = isnan(mat_ij) non_empty = logical_not(empty) axe = plt.subplot(grid_size_y, grid_size_x, fig_indx) minv = np_min(mat_ij[non_empty]) maxv = np_max(mat_ij[non_empty]) self.out_data_dict[fig_indx] = mat_ij im1 = axe.imshow( mat_ij, vmin=minv - 0.01 * abs(minv), vmax=maxv + 0.01 * abs(maxv), cmap=self.cmap, interpolation="nearest", aspect="auto", ) # "spectral" "hot" "RdBu_r" if annotate: crit_format = "%1.2g" for i in range(mat_ij.shape[0]): for j in range(mat_ij.shape[0]): _ = axe.text( j, i, crit_format % mat_ij[i, j], ha="center", va="center", color="w", fontsize=7, ) axe.set_title(criteria, fontsize=12) cax, kwa = matplotlib.colorbar.make_axes([axe]) plt.colorbar(im1, cax=cax, **kwa) im1.axes.get_xaxis().set_visible(False) im1.axes.get_yaxis().set_visible(False) return axe def __compute( self, som_grid_nx=5, # type: int som_grid_ny=5, # type: int ): """Build the SOM from optimization history. Args: som_grid_nx: The number of neurons in the x direction. som_grid_ny: The number of neurons in the y direction. """ x_history = self.database.get_x_history() x_vars = array(x_history).real self.som = self.__build_som_from_vars(x_vars, som_grid_nx, som_grid_ny) som_cluster_index = self.som.project_data(x_vars) som_coord = array(self.som.ind_to_xy(som_cluster_index), dtype=int32) coord_2d_offset = self.__coord2d_to_coords_offsets(som_coord) self.out_data_dict["SOM"] = coord_2d_offset for i, x_vars in enumerate(x_history): self.database.store( x_vars, { "SOM_indx": som_cluster_index[i], "SOM_i": som_coord[i, 0], "SOM_j": som_coord[i, 1], "SOM_x": coord_2d_offset[i, 0], "SOM_y": coord_2d_offset[i, 1], }, ) @staticmethod def __coord2d_to_coords_offsets( som_coord, # type: ndarray max_ofset=0.6, # type: float ): # type: (...) -> ndarray """Takes a coord array from SOM and adds an offset to the coordinates of the elements in the cluster so that they can be distinguished at display. Args: som_coord: The SOM coordinates. max_ofset: The maximum offset of the grid. Returns: The coordinates. """ coord_2d = som_coord[:, :2] coord_2d_offset = array(coord_2d, dtype=float64) coord_indx = som_coord[:, -1] y_vars = bincount(coord_indx) i = nonzero(y_vars)[0] uniques_occ = array(list(zip(i, y_vars[i]))) unique_indx = uniques_occ[:, 0] max_occ = np_max(uniques_occ[:, 1]) max_subarr_size = floor(sqrt(max_occ)) + 1 dxdy_max = max_ofset / (max_subarr_size - 1) for grp in unique_indx: inds_of_grp = where(coord_indx == grp)[0] subarr_size = sqrt(len(inds_of_grp)) if floor(subarr_size) < subarr_size: subarr_size = floor(subarr_size) + 1 else: subarr_size = floor(subarr_size) # Otherwise single individual then no need to build a grid if subarr_size > 1: grid = mgrid[0:subarr_size, 0:subarr_size] * dxdy_max gridx = grid[0, :, :].flatten() gridy = grid[1, :, :].flatten() for k, ind_in_grp in enumerate(inds_of_grp): coord_2d_offset[ind_in_grp, 0] += gridx[k] coord_2d_offset[ind_in_grp, 1] += gridy[k] return coord_2d_offset