Source code for gemseo.wrappers.disc_from_exe

# -*- coding: utf-8 -*-
# Copyright 2021 IRT Saint Exupéry, https://www.irt-saintexupery.com
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU Lesser General Public
# License version 3 as published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
# Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program; if not, write to the Free Software Foundation,
# Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.

# Contributors:
#    INITIAL AUTHORS - initial API and implementation and/or
#                      initial documentation
#        :author:  Francois Gallard
#    OTHER AUTHORS   - MACROSCOPIC CHANGES
"""
Make a discipline from an executable
************************************
"""
from __future__ import absolute_import, division, unicode_literals

import re
import subprocess
from ast import literal_eval
from copy import deepcopy
from multiprocessing import Lock, Manager
from os import listdir, mkdir
from os.path import join
from uuid import uuid1

from future import standard_library
from numpy import array

from gemseo import SOFTWARE_NAME
from gemseo.api import configure_logger
from gemseo.core.data_processor import FloatDataProcessor
from gemseo.core.discipline import MDODiscipline
from gemseo.core.json_grammar import JSONGrammar
from gemseo.utils.py23_compat import OrderedDict  # automatically dict from py36
from gemseo.utils.py23_compat import xrange

standard_library.install_aliases()

configure_logger(SOFTWARE_NAME)

from gemseo import LOGGER

INPUT_TAG = "GEMSEO_INPUT"
OUTPUT_TAG = "GEMSEO_OUTPUT"
INPUT_RE = INPUT_TAG + r"\{(.*)\}"
OUTPUT_RE = OUTPUT_TAG + r"\{(.*)\}"

INPUT_GRAMMAR = JSONGrammar.INPUT_GRAMMAR
OUTPUT_GRAMMAR = JSONGrammar.OUTPUT_GRAMMAR

NUMERICS = [str(j) for j in xrange(10)]


[docs]class DiscFromExe(MDODiscipline): """Generic wrapper for executables. The DiscFromExe is a generic wrapper for executables. It generates a MDODiscipline from an executable and in inputs/output files wrappers. The input and output files are described by templates. The templates can be generated by executing the module :mod:`~gemseo.wrappers.template_grammar_editor` to open a GUI. It requires the creation of templates for input and output file, for instance, from the following input JSON file: .. code:: { "a": 1.01515112125, "b": 2.00151511213, "c": 3.00151511213 } A template that declares the inputs must be generated under this format, where "a" is the name of the input, and "1.0" is the default input. GEMSEO_INPUT declares an input, GEMSEO_OUTPUT declares an output, similarly. .. code:: { "a": GEMSEO_INPUT{a::1.0}, "b": GEMSEO_INPUT{b::2.0}, "c": GEMSEO_INPUT{c::3.0} } Current limitations : Only one input and one output file, otherwise, inherit from this class and modify the parsers. Only limited input writing and output parser strategies are implemented. To change that, you can pass custom parsing and writing methods to the constructor. The only limitation in the current file format is that it must be a plain text file and not a binary file. In this case, the way of interfacing it is to provide a specific parser to the DiscFromExe, with the write_input_file_method and parse_outfile_method arguments of the constructor. """ NUMBERED = "numbered" UUID = "uuid" KEY_VALUE_PARSER = "KEY_VALUE_PARSER" TEMPLATE_PARSER = "TEMPLATE_PARSER" def __init__( self, input_template, output_template, output_folder_basepath, executable_command, input_filename, output_filename, folders_iter=NUMBERED, name=None, parse_outfile_method=TEMPLATE_PARSER, write_input_file_method=None, parse_out_separator="=", ): """Constructor. Create the discipline from the inputs, outputs wrapper and the executable command. :param str input_template: path to the input file template. The input locations in the file are marked by GEMSEO_INPUT{input_name::1.0}, where "input_name" is the input name, and 1.0 is here the default input :param str output_template: path to the output file template. The input locations in the file are marked by GEMSEO_OUTPUT{output_name::1.0}, where "output_name" is the input name :param str output_folder_basepath: path to the output folder, in which the executions will be performed :param str executable_command: command to run the executable. Will be called through a system call. Example: "python myscript.py -i input.txt -o output.txt :param str input_filename: name of the input file. This will determine the name of the input file generated in the output folder. Example "input.txt" :param str output_filename: name of the output file. This will determine the name of the output file generated in the output folder. Example "output.txt" :param str folders_iter: type of unique identifiers for the output folders. If NUMBERED the generated output folders will be "output_folder_basepath"+str(i+1), where i is the maximum value of the already existing "output_folder_basepath"+str(i) folders. Otherwise, a unique number based on the UUID function is generated. This last option shall be used if multiple MDO processes are runned in the same work directory. :param str parse_outfile_method: optional method that can be provided by the user to parse the output file. To see the signature of the method, see the parse_outfile method of this file. :param str parse_out_separator: if the KEY_VALUE_PARSER is used as output parser, specify the separator key (default : "="). :param str write_input_file_method: method to write the input file, if None, use this modules' write_input_file. To see the signature of the method, see the write_input_file method of this file. """ super(DiscFromExe, self).__init__(name=name) self.input_template = input_template self.output_template = output_template self.input_filename = input_filename self.output_filename = output_filename self.executable_command = executable_command use_template_parse = parse_outfile_method == self.TEMPLATE_PARSER if parse_outfile_method is None or use_template_parse: self.parse_outfile = parse_outfile elif parse_outfile_method == self.KEY_VALUE_PARSER: self.parse_outfile = lambda a, b: parse_key_value_file( a, b, parse_out_separator ) else: self.parse_outfile = parse_outfile_method if not hasattr(self.parse_outfile, "__call__"): raise TypeError("The parse_outfile_method must be callable") self.write_input_file = write_input_file_method or write_input_file if not hasattr(self.write_input_file, "__call__"): raise TypeError("The write_input_file_method must be callable") self.lock = Lock() self.folders_iter = folders_iter self.output_folder_basepath = output_folder_basepath self._out_pos = None self._in_dict = None self._out_dict = None self._in_lines = None self._out_lines = None n_dirs = self._get_max_outdir() self.counter = Manager().Value("i", n_dirs) self.data_processor = FloatDataProcessor() self._parse_templates() def _parse_templates(self): """Parse the templates. Parse the templates and: Initialize the grammars Initialize the attributes : self._in_lines, self._out_lines, self._out_pos, self._out_pos self.default_inputs """ with open(self.input_template, "r") as infile: self._in_lines = infile.readlines() with open(self.output_template, "r") as outfile: self._out_lines = outfile.readlines() self._in_dict, self._in_pos = parse_template(self._in_lines, INPUT_GRAMMAR) self.input_grammar.initialize_from_data_names(self._in_dict.keys()) out_dict, self._out_pos = parse_template(self._out_lines, OUTPUT_GRAMMAR) self.output_grammar.initialize_from_data_names(out_dict.keys()) msg = "Initialize discipline from template. \ Input grammar: {}".format( self._in_dict.keys() ) LOGGER.debug(msg) msg = "Initialize discipline from template. \ Output grammar: {}".format( out_dict.keys() ) LOGGER.debug(msg) self.default_inputs = { k: array([literal_eval(v)]) for k, v in self._in_dict.items() } def _run(self): """Run the wrapper.""" uuid = self.generate_uid() out_dir = join(self.output_folder_basepath, uuid) mkdir(out_dir) input_file_path = join(out_dir, self.input_filename) self.write_input_file( input_file_path, self.local_data, self._in_pos, self._in_lines ) err = subprocess.call( self.executable_command, shell=True, stderr=subprocess.STDOUT, cwd=out_dir ) if err != 0: raise RuntimeError("Execution failed and returned error code : " + str(err)) outfile = join(out_dir, self.output_filename) with open(outfile, "r") as outfile: out_lines = outfile.readlines() if len(out_lines) != len(self._out_lines): raise ValueError( "The number of lines of the output file changed." "This is not supported yet" ) out_vals = self.parse_outfile(self._out_pos, out_lines) self.local_data.update(out_vals)
[docs] def generate_uid(self): """Generate an UUID. Generate a unique identifier for the current execution If the folders_iter strategy is NUMBERED, the successive iterations are named by an integer 1, 2, 3 etc. This is multiprocess safe. Otherwise, a unique number based on the UUID function is generated. This last option shall be used if multiple MDO processes are runned in the same workdir. :returns: a unique string identifier :rtype: str """ if self.folders_iter == self.NUMBERED: with self.lock: self.counter.value += 1 return str(self.counter.value) return str(uuid1()).split("-")[-1]
def _list_out_dirs(self): """List the directories in the output folder path.""" return listdir(self.output_folder_basepath) def _get_max_outdir(self): """Get the maximum current index of output folders.""" outs = list(self._list_out_dirs()) if not outs: return 0 return max([literal_eval(n) for n in outs])
[docs]def parse_template(template_lines, grammar_type=INPUT_GRAMMAR): """Parse the input or output template. :param template_lines: list of lines of the file template (result of file.readlines()) :param grammar_type: INPUT_GRAMMAR or OUTPUT_GRAMMAR :returns: data_dict, pos_dict, where data_dict is the {name:value} dict, where name is the data name and value is the parsed input or output value in the template pos_dict in the format dictionary containing the information from the template format {data_name:(start,end,line_number)}, where name is the name of the input data, start is the index of the starting point in the input file template. This index is a line index (character number on the line) end is the index of the end character in the template line_number is the index of the line in the file """ if grammar_type == INPUT_GRAMMAR: pattern_re = INPUT_RE elif grammar_type == OUTPUT_GRAMMAR: pattern_re = OUTPUT_RE else: raise ValueError("Unknown grammar type " + str(grammar_type)) regex = re.compile(pattern_re) # , re.MULTILINE data_dict = OrderedDict() pos_dict = OrderedDict() for lineid, line in enumerate(template_lines): for match in regex.finditer(line): data = match.groups()[0] spl = data.split("::") name = spl[0] val = spl[1] data_dict[name] = val # When input mode: erase the template value if grammar_type == INPUT_GRAMMAR: start, end = match.start(), match.end() else: # In output mode : catch all # the output lenght and not more start = match.start() end = start + len(val) pos_dict[name] = (start, end, lineid) return data_dict, pos_dict
[docs]def write_input_file( input_file_path, data, input_positions, input_lines, float_format="{:1.18g}" ): """Write the input file from the input data. :param input_file_path: absolute path to the file to be written :param data: data dictionary, ie the local data of the discipline :param input_positions: dictionary containing the information from the template format {data_name:(start,end,line_number)}, where name is the name of the input data, start is the index of the starting point in the input file template. This index is a line index (character number on the line) end is the index of the end character in the template line_number is the index of the line in the file :param input_lines: list of lines of the input file template (result of file.readlines()) :param float_format: formating of the input data in the file """ f_text = deepcopy(input_lines) for name, pos in input_positions.items(): start, end, lineid = pos data_str = float_format.format(data[name]) cline = f_text[lineid] f_text[lineid] = cline[:start] + data_str + cline[end:] with open(input_file_path, "w") as infile_o: infile_o.writelines(f_text)
[docs]def parse_key_value_file(_, out_lines, separator="="): """Parse the output file from the expected text positions. :param out_lines: list of lines of the output file template (result of file.readlines()) :param separator: separating characters of the key=value format :returns: the values dictionary in dict of numpy array formats """ data = {} for line in out_lines: if separator in line: spl = line.strip().split(separator) if len(spl) != 2: raise ValueError("unbalanced = in line " + str(line)) key = spl[0].strip() try: data[key] = float(literal_eval(spl[1].strip())) except Exception: raise ValueError("Failed to parse value as float " + str(spl[1])) return data
[docs]def parse_outfile(output_positions, out_lines): """Parse the output file from the expected text positions. :param output_positions: dictionary containing the information from the template format {data_name:(start,end,dictionary)}, where name is the name of the output data, start is the index of the starting point in the input file template. This index is a line index (character number on the line) end is the index of the end character in the template line_number is the index of the line in the file :param out_lines: list of lines of the output file template (result of file.readlines()) :returns: the values dictionary in dict of numpy array formats """ values = {} for name, pos in output_positions.items(): start, _, lineid = pos found_dot = False found_e = False # In case generated files has less lines if lineid > len(out_lines) - 1: break out_text = out_lines[lineid] i = start maxi = len(out_text) while True: # The problem is that the output file used for the template may be # using a output that is longer or shorter than the one generated # at runtime. We must find the proper end of the expression... i += 1 char = out_text[i] if char == ".": # We found the . in float notation if found_dot or found_e: break found_dot = True continue # We found the e in exp notation if char in ("E", "e"): if found_e: break found_e = True continue # Check that we have nout reached EOL or space or whatever if char not in NUMERICS: break if i == maxi - 1: print("IBRAEAU") break outv = out_text[pos[0] : i] LOGGER.info('Parsed "' + name + '" got output ' + outv) values[name] = array([float(outv)]) return values