# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
# Contributors:
# INITIAL AUTHORS - API and implementation and/or documentation
# :author: Francois Gallard
# OTHER AUTHORS - MACROSCOPIC CHANGES
"""Self Organizing Maps to display high dimensional design spaces."""
from __future__ import division, unicode_literals
import logging
from math import floor, sqrt
import matplotlib
from numpy import array, bincount, float64, int32, isnan, logical_not
from numpy import max as np_max
from numpy import mean, mgrid
from numpy import min as np_min
from numpy import ndarray, nonzero, unique, where, zeros
from pylab import plt
from gemseo.algos.opt_problem import OptimizationProblem
from gemseo.post.core.colormaps import PARULA
from gemseo.post.opt_post_processor import OptPostProcessor
from gemseo.third_party import sompy
LOGGER = logging.getLogger(__name__)
[docs]class SOM(OptPostProcessor):
"""Self organizing map clustering optimization history.
Options of the plot method are the x- and y- numbers of cells in the SOM.
"""
DEFAULT_FIG_SIZE = (12.0, 18.0)
def __init__(
self,
opt_problem, # type: OptimizationProblem
): # type: (...) -> None
super(SOM, self).__init__(opt_problem)
self.som = None
self.cmap = PARULA
@staticmethod
def __build_som_from_vars(
x_vars, # type: ndarray
som_grid_nx=5, # type:int
som_grid_ny=5, # type:int
initmethod="pca", # type:str
verbose="off", # type:str
): # type: (...) -> SOM
"""Builds the SOM from the design variables history.
Args:
x_vars: The design variables history (n_iter,n_dv).
som_grid_nx: The number of neurons in the x direction.
som_grid_ny: The number of neurons in the y direction.
initmethod: The initialization method for the SOM.
verbose: The verbose for SOM training.
Returns:
The self organizing map
"""
LOGGER.info("Building Self Organizing Map from optimization history:")
LOGGER.info(" Number of neurons in x direction = %s", str(som_grid_nx))
LOGGER.info(" Number of neurons in y direction = %s", str(som_grid_ny))
var_som = sompy.SOM(
"som",
x_vars,
mapsize=[som_grid_ny + 1, som_grid_nx + 1],
norm_method="var",
initmethod=initmethod,
)
var_som.init_map()
var_som.train(n_job=1, shared_memory="no", verbose=verbose)
return var_som
def _plot(
self,
n_x=4, # type: int
n_y=4, # type: int
annotate=False, # type: bool
): # type: (...) -> None
"""
Args:
n_x: The number of grids in x.
n_y: The number of grids in y.
annotate: If True, add label of neuron value to SOM plot.
"""
criteria_list = [
self.opt_problem.get_objective_name()
] + self.opt_problem.get_constraints_names()
all_data = self.database.get_all_data_names()
# Ensure that the data is available in the database
for crit in criteria_list:
if crit not in all_data:
criteria_list.remove(crit)
figure = plt.figure(figsize=self.DEFAULT_FIG_SIZE)
figure.suptitle("Self Organizing Maps of the design space", fontsize=14)
subplot_number = 0
self.__compute(n_x, n_y)
for criteria in criteria_list:
f_hist, _ = self.database.get_complete_history(
["SOM_i", "SOM_j", "SOM_indx", criteria]
)
if isinstance(f_hist[0][3], ndarray):
dim_val = f_hist[0][3].size
for _ in range(dim_val):
subplot_number += 1
else:
subplot_number += 1
grid_size_x = 3
grid_size_y = subplot_number // grid_size_x
if (subplot_number % grid_size_x) > 0:
grid_size_y += 1
fig_indx = 1
for criteria in criteria_list:
f_hist, _ = self.database.get_complete_history(
["SOM_i", "SOM_j", "SOM_indx", criteria]
)
if isinstance(f_hist[0][3], ndarray):
dim_val = f_hist[0][3].size
for k in range(dim_val):
f_hist_scalar = []
for f_h in f_hist:
scal_list = f_h[0:3]
scal_list.append(f_h[3][k])
f_hist_scalar.append(scal_list)
criteria_name = criteria + "_" + str(k)
self.__plot_som_from_scalar_data(
f_hist_scalar,
criteria_name,
fig_indx,
grid_size_x=grid_size_x,
grid_size_y=grid_size_y,
annotate=annotate,
)
fig_indx += 1
else:
self.__plot_som_from_scalar_data(
f_hist,
criteria,
fig_indx,
grid_size_x=grid_size_x,
grid_size_y=grid_size_y,
annotate=annotate,
)
fig_indx += 1
self._add_figure(figure)
def __plot_som_from_scalar_data(
self,
f_hist_scalar, # type: ndarray
criteria, # type: str
fig_indx, # type: int
grid_size_x=3, # type: int
grid_size_y=20, # type: int
annotate=False, # type: bool
):
"""Builds the SOM plot after computation for a given criteria.
Args:
criteria: The criteria to show.
f_hist_scalar: The scalar data to show.
fig_indx: The axe index in the figure.
grid_size_x: The number of SOMs in the grid on the x axis.
grid_size_y: The number of SOMs in the grid on the y axis.
annotate: If True, add label with average value of neural.
"""
f_hist = array(f_hist_scalar).T.real
unique_ind = unique(f_hist[2, :])
average = {}
for _, som_id in enumerate(unique_ind):
where_somid = where(f_hist[2, :] == som_id)[0]
ranges_of_uniques = f_hist[3, where_somid]
average[som_id] = mean(ranges_of_uniques)
ijshape = array((np_max(f_hist[0, :]), np_max(f_hist[1, :])), dtype=int32)
mat_ij = zeros(ijshape, dtype=float64)
mat_ij[:, :] = float("nan")
for itr in range(f_hist.shape[-1]):
i, j, somindx, _ = f_hist[:, itr]
mat_ij[int(i) - 1, int(j) - 1] = average[somindx]
empty = isnan(mat_ij)
non_empty = logical_not(empty)
axe = plt.subplot(grid_size_y, grid_size_x, fig_indx)
minv = np_min(mat_ij[non_empty])
maxv = np_max(mat_ij[non_empty])
self.out_data_dict[fig_indx] = mat_ij
im1 = axe.imshow(
mat_ij,
vmin=minv - 0.01 * abs(minv),
vmax=maxv + 0.01 * abs(maxv),
cmap=self.cmap,
interpolation="nearest",
aspect="auto",
) # "spectral" "hot" "RdBu_r"
if annotate:
crit_format = "%1.2g"
for i in range(mat_ij.shape[0]):
for j in range(mat_ij.shape[0]):
_ = axe.text(
j,
i,
crit_format % mat_ij[i, j],
ha="center",
va="center",
color="w",
fontsize=7,
)
axe.set_title(criteria, fontsize=12)
cax, kwa = matplotlib.colorbar.make_axes([axe])
plt.colorbar(im1, cax=cax, **kwa)
im1.axes.get_xaxis().set_visible(False)
im1.axes.get_yaxis().set_visible(False)
return axe
def __compute(
self,
som_grid_nx=5, # type: int
som_grid_ny=5, # type: int
):
"""Build the SOM from optimization history.
Args:
som_grid_nx: The number of neurons in the x direction.
som_grid_ny: The number of neurons in the y direction.
"""
x_history = self.database.get_x_history()
x_vars = array(x_history).real
self.som = self.__build_som_from_vars(x_vars, som_grid_nx, som_grid_ny)
som_cluster_index = self.som.project_data(x_vars)
som_coord = array(self.som.ind_to_xy(som_cluster_index), dtype=int32)
coord_2d_offset = self.__coord2d_to_coords_offsets(som_coord)
self.out_data_dict["SOM"] = coord_2d_offset
for i, x_vars in enumerate(x_history):
self.database.store(
x_vars,
{
"SOM_indx": som_cluster_index[i],
"SOM_i": som_coord[i, 0],
"SOM_j": som_coord[i, 1],
"SOM_x": coord_2d_offset[i, 0],
"SOM_y": coord_2d_offset[i, 1],
},
)
@staticmethod
def __coord2d_to_coords_offsets(
som_coord, # type: ndarray
max_ofset=0.6, # type: float
): # type: (...) -> ndarray
"""Takes a coord array from SOM and adds an offset to the coordinates of the
elements in the cluster so that they can be distinguished at display.
Args:
som_coord: The SOM coordinates.
max_ofset: The maximum offset of the grid.
Returns:
The coordinates.
"""
coord_2d = som_coord[:, :2]
coord_2d_offset = array(coord_2d, dtype=float64)
coord_indx = som_coord[:, -1]
y_vars = bincount(coord_indx)
i = nonzero(y_vars)[0]
uniques_occ = array(list(zip(i, y_vars[i])))
unique_indx = uniques_occ[:, 0]
max_occ = np_max(uniques_occ[:, 1])
max_subarr_size = floor(sqrt(max_occ)) + 1
dxdy_max = max_ofset / (max_subarr_size - 1)
for grp in unique_indx:
inds_of_grp = where(coord_indx == grp)[0]
subarr_size = sqrt(len(inds_of_grp))
if floor(subarr_size) < subarr_size:
subarr_size = floor(subarr_size) + 1
else:
subarr_size = floor(subarr_size)
# Otherwise single individual then no need to build a grid
if subarr_size > 1:
grid = mgrid[0:subarr_size, 0:subarr_size] * dxdy_max
gridx = grid[0, :, :].flatten()
gridy = grid[1, :, :].flatten()
for k, ind_in_grp in enumerate(inds_of_grp):
coord_2d_offset[ind_in_grp, 0] += gridx[k]
coord_2d_offset[ind_in_grp, 1] += gridy[k]
return coord_2d_offset