Source code for gemseo.utils.xdsmizer

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - API and implementation and/or documentation
#       :author: Remi Lafage
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
Generate XDSMjs input json file
*******************************
"""

from __future__ import absolute_import, division, print_function, unicode_literals

import webbrowser
from builtins import open, str
from json import dumps
from multiprocessing import RLock
from os import getcwd
from os.path import abspath, basename, join, splitext
from tempfile import mkdtemp

from future import standard_library

from gemseo.core.discipline import MDODiscipline
from gemseo.core.execution_sequence import (
    AtomicExecSequence,
    LoopExecSequence,
    ParallelExecSequence,
    SerialExecSequence,
)
from gemseo.core.mdo_scenario import MDOScenarioAdapter
from gemseo.core.monitoring import Monitoring
from gemseo.mda.mda import MDA
from gemseo.utils.locks import synchronized
from gemseo.utils.show_utils import generate_xdsm_html

from .xdsm_tikz import xdsm_dict2tex

standard_library.install_aliases()


from gemseo import LOGGER

OPT_NAME = OPT_ID = "Opt"
USER_NAME = USER_ID = "_U_"


[docs]class XDSMizer(object): """Class to build the XDSM diagram of a scenario Generates input json for XDSMjs javascript library https://github.com/OneraHub/XDSMjs See : Martins, Joaquim RRA, and Andrew B. Lambe. "Multidisciplinary design optimization: a survey of architectures." AIAA journal 51.9 (2013): 2049-2075. """ def __init__(self, scenario, hashref="root", level=0, expected_workflow=None): """ Constructor :param scenario: the MDO scenario to be represented as a XDSM diagram :param hashref: key used in the final XDSM json structure to reference {nodes, edges, workflow, optpb} data structure. Default to 'root' :param level: level number corresponding to scenario depth. Root scenario is level 0. """ self.scenario = scenario self.level = level self.hashref = hashref self.lock = RLock() self._monitor = None self.outdir = "." self.outfilename = "xdsm.json" self.to_hashref = {} self.to_id = {} # dictionary to map AtomicExecSequence to XDSM id self.initialize(expected_workflow) self.print_statuses = False # Prints the statuses in the console self.latex_output = False
[docs] def initialize(self, workflow=None): """ Initialize from a given workflow or use self.scenario' one, create sub XDSM diagram accordingly. :param workflow: composite execution sequence """ self.sub_xdsmizers = [] # Find disciplines from workflow structure if workflow: self.workflow = workflow else: self.workflow = self.scenario.get_expected_workflow() self.atoms = XDSMizer._get_single_level_atoms(self.workflow) self.to_hashref = {} level = self.level + 1 num = 1 for atom in self.atoms: if atom.discipline.is_scenario(): if atom.discipline == self.scenario: self.to_hashref[atom] = "root" self.root_atom = atom else: # sub-scenario self.to_hashref[atom] = "scn-" + str(level) + "-" + str(num) sub_workflow = XDSMizer._find_sub_workflow(self.workflow, atom) self.sub_xdsmizers.append( XDSMizer( atom.discipline, self.to_hashref[atom], level, sub_workflow ) ) num += 1
[docs] def monitor( self, outdir=".", outfilename="xdsm.json", print_statuses=False, latex_output=False, ): """Monitors |g| discipline execution by generating XDSM json file on discipline status update. :param str outdir: the directory where XDSM json file is generated :param str outfilename: the file name of the generated XDSM json file (default: xdsm.json) :param bool print_statuses: print the statuses in the console at each update (default: False) :param bool latex_output: generate tikz, tex and pdf output (default: False) """ self._monitor = Monitoring(self.scenario) self._monitor.add_observer(self) # have to reinitialize with monitored workflow self.initialize(self._monitor.workflow) self.outdir = outdir self.outfilename = outfilename self.print_statuses = print_statuses self.latex_output = latex_output
[docs] def update(self, atom): # pylint: disable=unused-argument """Callback function that generate new XDSM regarding the given atom status update :param atom: discipline which status has been updated """ self.run( outdir=self.outdir, outfilename=self.outfilename, latex_output=self.latex_output, ) if self.print_statuses: LOGGER.info(str(self._monitor))
[docs] def run( self, outdir=None, latex_output=False, outfilename="xdsm.html", html_output=True, json_output=False, open_browser=False, ): """Generates a XDSM diagram from the process. By default, a self contined HTML file is generated, that can be viewed in a browser. :param outdir: the directory where XDSM json file is generated. if None, current working dir is used. If open_browser is True and outdir is None, generates a temporary directory to store the file :param outfilename: the file name of the generated XDSM json file (default: xdsm.json). :param latex_output: produces .tex, .tikz and .tex files if True If ``outdir`` is not set the XDSM json is printed on the standard output. :param open_browser: if True, opens the web browser with the XDSM :param html_output: if True, outputs a self contained HTML file :param json_output: if True, outputs a JSON file for XDSMjs :returns: XDSM json either in a file when ``outdir`` is set, ouput on the console otherwise. """ xdsm = self.xdsmize() xdsm_json = dumps(xdsm, indent=2, ensure_ascii=False) base = basename(outfilename) outfile_basename = splitext(base)[0] no_html_loc = False if outdir is None: outdir = getcwd() no_html_loc = True if json_output: with open(join(outdir, outfile_basename + ".json"), "w") as out: out.write(xdsm_json) if latex_output: xdsm_dict2tex(xdsm, outdir, outfile_basename) if html_output or open_browser: if no_html_loc: outdir = mkdtemp(suffix="", prefix="tmp", dir=None) out_file_path = join(outdir, outfile_basename + ".html") LOGGER.info("Generating HTML XDSM file in : %s", out_file_path) generate_xdsm_html(xdsm, out_file_path) if open_browser: url = "file://" + abspath(out_file_path) webbrowser.open(url, new=2) # open in new tab return out_file_path return xdsm
[docs] def get_all_sub_xdsmizers(self): """Retrieves all sub xdsmizers corresponding to sub Scenario objects :returns: the array of XDSMizer objects """ result = [] for sub in self.sub_xdsmizers: result.append(sub) result.extend(sub.get_all_sub_xdsmizers()) return result
[docs] @synchronized def xdsmize(self, algoname="Optimizer"): """Builds the Python data structure to be used to generate JSON format compatible with XDSMjs viewer. :param algoname: Default value = "Optimizer") :returns: the Python object containing relevant information for XDSM representation """ nodes = self._create_nodes(algoname) edges = self._create_edges() workflow = self._create_workflow() optpb = str(self.scenario.formulation.opt_problem) if self.level == 0: res = { self.hashref: { "nodes": nodes, "edges": edges, "workflow": workflow, "optpb": optpb, } } for sub_xdsmizer in self.get_all_sub_xdsmizers(): res[sub_xdsmizer.hashref] = sub_xdsmizer.xdsmize() return res return {"nodes": nodes, "edges": edges, "workflow": workflow, "optpb": optpb}
def _create_nodes(self, algoname): # pylint: disable=too-many-branches """Manages XDSM diagram nodes creation from optimization algorithm and disciplines :param algoname: name of the algorithm """ nodes = [] self.to_id = {} statuses = self.workflow.get_state_dict() # Optimization self.to_id[self.root_atom] = OPT_ID opt_node = {"id": OPT_ID, "name": algoname, "type": "optimization"} if statuses[self.root_atom.uuid]: opt_node["status"] = statuses[self.root_atom.uuid] nodes.append(opt_node) # Disciplines for atom_id, atom in enumerate( self.atoms ): # pylint: disable=too-many-nested-blocks # if a node already created from an atom with same discipline # at one level just reference the same node for ref_atom in self.to_id: if atom.discipline == ref_atom.discipline: self.to_id[atom] = self.to_id[ref_atom] if ( atom.status and atom.parent.status is MDODiscipline.STATUS_RUNNING ): node = None for a_node in nodes: if a_node["id"] == self.to_id[atom]: node = a_node break if not node: # TODO: add specific exception? raise "Node " + self.to_id[ atom ] + " not found in " + nodes # pragma: no cover node["status"] = atom.status break if atom in self.to_id: continue self.to_id[atom] = "Dis" + str(atom_id) node = {"id": self.to_id[atom], "name": atom.discipline.name} # node type if isinstance(atom.discipline, MDA): node["type"] = "mda" elif atom.discipline.is_scenario(): node["type"] = "mdo" node["subxdsm"] = self.to_hashref[atom] node["name"] = atom.discipline.name + "_" + self.to_hashref[atom] else: node["type"] = "analysis" if statuses[atom.uuid]: node["status"] = statuses[atom.uuid] nodes.append(node) return nodes def _create_edges(self): """Manage XDSM edges creation from scenario dataflow.""" edges = [] # convenient method to factorize code for creating and appending edges def add_edge(from_edge, to_edge, varnames): """Adds an edge :param from_edge: param to_edge: :param varnames: :param to_edge: """ edge = {"from": from_edge, "to": to_edge, "name": ", ".join(varnames)} edges.append(edge) # For User to/from optimization opt_pb = self.scenario.formulation.opt_problem # fct names such as -y4 functions_names = opt_pb.get_all_functions_names() # output variables used by the fonction (eg y4) fct_varnames = [f.outvars for f in opt_pb.get_all_functions()] function_varnames = [] for fvars in fct_varnames: function_varnames.extend(fvars) to_user = functions_names to_opt = self.scenario.get_optim_variables_names() add_edge(USER_ID, OPT_ID, [x + "^(0)" for x in to_opt]) add_edge(OPT_ID, USER_ID, [x + "^*" for x in to_user]) # Disciplines to/from optimization for atom in self.atoms: if atom is not self.root_atom: varnames = set(atom.discipline.get_input_data_names()) & set( self.scenario.get_optim_variables_names() ) if varnames: add_edge(OPT_ID, self.to_id[atom], varnames) varnames = set(atom.discipline.get_output_data_names()) & set( function_varnames ) # print set(disc.get_output_data_names()), set(functions_names) if varnames: add_edge(self.to_id[atom], OPT_ID, varnames) # Disciplines to User/Optimization (from User is already handled at # optimizer level) disc_to_opt = function_varnames for atom in self.atoms: if atom is not self.root_atom: # special case MDA : skipped if isinstance(atom.discipline, MDA): continue out_to_user = [ o for o in atom.discipline.get_output_data_names() if o not in disc_to_opt ] out_to_opt = [ o for o in atom.discipline.get_output_data_names() if o in disc_to_opt ] if out_to_user: add_edge(self.to_id[atom], USER_ID, [x + "^*" for x in out_to_user]) if out_to_opt: add_edge(self.to_id[atom], OPT_ID, out_to_opt) # Disciplines to/from disciplines for coupling in self.scenario.get_expected_dataflow(): (disc1, disc2, varnames) = coupling add_edge( self.to_id[self._find_atom(disc1)], self.to_id[self._find_atom(disc2)], varnames, ) return edges @staticmethod def _get_single_level_atoms(workflow): """ Retrieves the list of atoms of the given workflow without looking into loop execution sequences coming from Scenario. Thus retrieves the atoms for a one level XDSM diagram. :param workflow: execution sequence :returns: a list of atoms """ atoms = [] for seq in workflow.sequence_list: if isinstance(seq, LoopExecSequence): atoms.append(seq.atom_controller) if not seq.atom_controller.discipline.is_scenario(): atoms += XDSMizer._get_single_level_atoms(seq.iteration_sequence) elif isinstance(seq, AtomicExecSequence): atoms.append(seq) else: atoms += XDSMizer._get_single_level_atoms(seq) return atoms def _find_atom(self, discipline): """ Find atom corresponding to the given discipline. Raise exception if not found :param disicpline: an MDODiscipline :returns: the corresponding atom """ atom = None if isinstance(discipline, MDOScenarioAdapter): atom = self._find_atom(discipline.scenario) else: for atom_i in self.atoms: if discipline == atom_i.discipline: atom = atom_i if atom is None: disciplines = [a.discipline for a in self.atoms] raise ValueError( "Discipline {} " "not found in {}".format(discipline, disciplines) ) return atom @staticmethod def _find_sub_workflow(workflow, atom_controller): """ Find loop execution sequence sub_workflow with the given atom as controller within the given workflow :param atom_controller: the AtomicExecSequence object that controls the LoopExecutionSequence object to find :returns: the subworkflow (LoopExecutionSequence), None if not found """ sub_workflow = None for seq in workflow.sequence_list: if isinstance(seq, LoopExecSequence): if seq.atom_controller.uuid == atom_controller.uuid: sub_workflow = seq return sub_workflow sub_workflow = sub_workflow or XDSMizer._find_sub_workflow( seq.iteration_sequence, atom_controller ) elif not isinstance(seq, AtomicExecSequence): sub_workflow = sub_workflow or XDSMizer._find_sub_workflow( seq, atom_controller ) return sub_workflow def _create_workflow(self): """Manage XDSM workflow creation from formulation workflow""" workflow = [USER_ID, expand(self.workflow, self.to_id)] return workflow
[docs]def expand(wks, to_id): """Expands workflow structure as an ids structure using to_id mapping. The expansion preserve the structure while replacing the object by its id in all case except when a tuple is encountered as cdr then the expansion transforms loop[A, (B,C)] in [idA, {'parallel': [idB, idC]}] :param wks: the workflow structure :param to_id: the mapping dict from object to id :returns: the ids structure valid to be used as XDSM json chains """ if isinstance(wks, SerialExecSequence): res = [] for seq in wks.sequence_list: res += expand(seq, to_id) ids = res elif isinstance(wks, ParallelExecSequence): res = [] for seq in wks.sequence_list: res += expand(seq, to_id) ids = [{"parallel": res}] elif isinstance(wks, LoopExecSequence): if ( wks.atom_controller.discipline.is_scenario() and to_id[wks.atom_controller] != OPT_ID ): # sub-scnario consider only the controller ids = [to_id[wks.atom_controller]] else: ids = [to_id[wks.atom_controller], expand(wks.iteration_sequence, to_id)] elif isinstance(wks, AtomicExecSequence): ids = [to_id[wks]] else: raise Exception("Bad execution sequence : found " + str(wks)) return ids