Source code for moead_framework.algorithm.abstract_moead

import random

import numpy as np

from moead_framework.aggregation.functions import AggregationFunction
from moead_framework.core.genetic_operator import GeneticOperator
from moead_framework.core.offspring_generator.abstract_mating import OffspringGenerator
from moead_framework.core.offspring_generator.offspring_generator import OffspringGeneratorGeneric
from moead_framework.core.parent_selector.abstract_parent_selector import ParentSelector
from moead_framework.core.selector.abstract_selector import MatingPoolSelector
from moead_framework.core.selector.closest_neighbors_selector import ClosestNeighborsSelector
from moead_framework.core.sps_strategy.abstract_sps import SpsStrategy
from moead_framework.core.sps_strategy.sps_all import SpsAllSubproblems
from moead_framework.core.termination_criteria.abstract_termination_criteria import TerminationCriteria
from moead_framework.core.termination_criteria.max_evaluation import MaxEvaluation
from moead_framework.problem import Problem
from moead_framework.tool.mop import is_duplicated, get_non_dominated, generate_weight_vectors


[docs]class AbstractMoead: """ Abstract class to implement a new algorithm based on MOEA/D in the framework https://moead-framework.github.io/framework/html/tuto.html#implement-your-own-algorithm """
[docs] def __init__(self, problem, max_evaluation, number_of_weight_neighborhood, aggregation_function, weight_file, termination_criteria=None, genetic_operator=None, parent_selector=None, mating_pool_selector=None, sps_strategy=None, offspring_generator=None, number_of_weight=None, number_of_objective=None, ): """ Constructor of the algorithm. :param problem: {:class:`~moead_framework.problem.Problem`} problem to optimize :param max_evaluation: {integer} maximum number of evaluation :param number_of_weight_neighborhood: {integer} size of the neighborhood. :param aggregation_function: {:class:`~moead_framework.aggregation.functions.AggregationFunction`} :param weight_file: {string} path of the weight file. Each line represent a weight vector, each column represent a coordinate. An exemple is available here: https://github.com/moead-framework/data/blob/master/weights/SOBOL-2objs-10wei.ws :param termination_criteria: Optional -- {:class:`~moead_framework.core.termination_criteria.abstract_termination_criteria.TerminationCriteria`} The default component is {:class:`~moead_framework.core.termination_criteria.max_evaluation.MaxEvaluation`} :param genetic_operator: Optional -- {:class:`~moead_framework.core.genetic_operator.abstract_operator.GeneticOperator`} The default operator depends of the problem type (combinatorial / numerical) :param parent_selector: Optional -- {:class:`~moead_framework.core.parent_selector.abstract_parent_selector.ParentSelector`} The default operator depends of the number of solution required by the genetic operator :param mating_pool_selector: Optional -- {:class:`~moead_framework.core.selector.abstract_selector.MatingPoolSelector`} The default selector is {:class:`~moead_framework.core.selector.closest_neighbors_selector.ClosestNeighborsSelector`} :param sps_strategy: Optional -- {:class:`~moead_framework.core.sps_strategy.abstract_sps.SpsStrategy`} The default strategy is {:class:`~moead_framework.core.sps_strategy.sps_all.SpsAllSubproblems`} :param offspring_generator: Optional -- {:class:`~moead_framework.core.offspring_generator.abstract_mating.OffspringGenerator`} The default generator is {:class:`~moead_framework.core.offspring_generator.offspring_generator.OffspringGeneratorGeneric`} :param number_of_weight: Deprecated -- {integer} number of weight vector used to decompose the problem. Deprecated, remove in the next major release. :param number_of_objective: Deprecated -- {integer} number of objective in the problem. Deprecated, remove in the next major release. """ if isinstance(problem, Problem): self.problem = problem else: raise TypeError("The expected type of `problem` is `Problem`") if not isinstance(weight_file, str): raise TypeError("The expected type of `weight_file` is `Str`") if issubclass(aggregation_function, AggregationFunction): self.aggregation_function = aggregation_function() else: raise TypeError("The expected type of `aggregation_function` is `AggregationFunction`") if number_of_weight is not None: import warnings warnings.warn("The attribute number_of_weight is deprecated", DeprecationWarning) if number_of_objective is not None: import warnings warnings.warn("The attribute number_of_objective is deprecated", DeprecationWarning) if termination_criteria is None: self.termination_criteria = MaxEvaluation(algorithm_instance=self) else: if issubclass(termination_criteria, TerminationCriteria): self.termination_criteria = termination_criteria(algorithm_instance=self) else: raise TypeError("The expected type of `termination_criteria` is `TerminationCriteria`") if isinstance(max_evaluation, int): self.max_evaluation = max_evaluation else: raise TypeError("The expected type of `max_evaluation` is `int`") self.number_of_objective = self.problem.number_of_objective if isinstance(number_of_weight_neighborhood, int): self.t = number_of_weight_neighborhood else: raise TypeError("The expected type of `number_of_weight_neighborhood` is `int`") self.ep = [] self.weights = generate_weight_vectors(weight_file, shuffle=False) self.number_of_weight = len(self.weights) self.population = self.initial_population() random.shuffle(self.weights) self.z = self.init_z() self.b = self.generate_closest_weight_vectors() self.current_sub_problem = -1 self.current_eval = 1 self.mating_pool = [] if sps_strategy is None: self.sps_strategy = SpsAllSubproblems(algorithm_instance=self) else: if issubclass(sps_strategy, SpsStrategy): self.sps_strategy = sps_strategy(algorithm_instance=self) else: raise TypeError("The expected type of `sps_strategy` is `SpsStrategy`") if mating_pool_selector is None: self.mating_pool_selector = ClosestNeighborsSelector(algorithm_instance=self) else: if issubclass(mating_pool_selector, MatingPoolSelector): self.mating_pool_selector = mating_pool_selector(algorithm_instance=self) else: raise TypeError("The expected type of `mating_pool_selector` is `MatingPoolSelector`") if issubclass(genetic_operator, GeneticOperator): self.genetic_operator = genetic_operator else: raise TypeError("The expected type of `genetic_operator` is `GeneticOperator`") if issubclass(parent_selector, ParentSelector): self.parent_selector = parent_selector(algorithm=self) else: raise TypeError("The expected type of `parent_selector` is `ParentSelector`") if offspring_generator is None: self.offspring_generator = OffspringGeneratorGeneric(algorithm_instance=self) else: if issubclass(offspring_generator, OffspringGenerator): self.offspring_generator = offspring_generator(algorithm_instance=self) else: raise TypeError("The expected type of `offspring_generator` is `OffspringGenerator`")
[docs] def run(self, checkpoint=None): """ Execute the algorithm. :param checkpoint: {function} The default value is None. The checkpoint can be used to save data during the process. The function required one parameter of type {:class:`~moead_framework.algorithm.abstract_moead.AbstractMoead`} :return: list<{:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}> All non-dominated solutions found by the algorithm Example: >>> from moead_framework.algorithm.combinatorial import Moead >>> from moead_framework.tool.result import save_population >>> moead = Moead(...) >>> >>> def checkpoint(moead_algorithm: AbstractMoead): >>> if moead_algorithm.current_eval % 10 == 0 : >>> filename = "non_dominated_solutions-eval" + str(moead_algorithm.current_eval) + ".txt" >>> save_population(file_name=filename, population=moead_algorithm.ep) >>> >>> population = moead.run(checkpoint) """ while self.termination_criteria.test(): # For each sub-problem i for i in self.get_sub_problems_to_visit(): if checkpoint is not None: checkpoint(self) self.optimize_sub_problem(sub_problem_index=i) return self.ep
[docs] def optimize_sub_problem(self, sub_problem_index): """ Execute the process to optimize the sub-problem currently iterated :param sub_problem_index: {integer} index of the sub-problem iterated :return: """ self.update_current_sub_problem(sub_problem=sub_problem_index) self.mating_pool = self.mating_pool_selection(sub_problem=sub_problem_index)[:] y = self.generate_offspring(population=self.mating_pool) y = self.repair(solution=y) self.update_z(solution=y) self.update_solutions(solution=y, aggregation_function=self.aggregation_function, sub_problem=sub_problem_index) self.current_eval += 1
[docs] def update_solutions(self, solution, aggregation_function, sub_problem): """ Update solutions of the population and of the external archive ep :param solution: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} the candidate solution also called offspring :param aggregation_function: {:class:`~moead_framework.aggregation.functions.AggregationFunction`} Aggregation function used to compare solution in a multi-objective context :param sub_problem: {integer} index of the sub-problem currently visited :return: """ for j in self.b[sub_problem]: y_score = aggregation_function.run(solution=solution, number_of_objective=self.number_of_objective, weights=self.weights, sub_problem=j, z=self.z) pop_j_score = aggregation_function.run(solution=self.population[j], number_of_objective=self.number_of_objective, weights=self.weights, sub_problem=j, z=self.z) if aggregation_function.is_better(pop_j_score, y_score): self.population[j] = solution if not is_duplicated(x=solution, population=self.ep, number_of_objective=self.number_of_objective): self.ep.append(solution) self.ep = get_non_dominated(self.ep)
[docs] def get_sub_problems_to_visit(self): """ Select sub-problems to visit for the next generation. This function calls the component :class:`~moead_framework.core.sps_strategy.abstract_sps.SpsStrategy` :return: {list} indexes of sub-problems """ return self.sps_strategy.get_sub_problems()
[docs] def mating_pool_selection(self, sub_problem): """ Select the set of solutions where future parents solutions will be selected according to the current sub-problem visited :param sub_problem: {integer} index of the sub-problem currently visited :return: {list} indexes of sub-problems """ return self.mating_pool_selector.select(sub_problem)
[docs] def generate_offspring(self, population): """ Generate a new offspring. This function calls the component :class:`~moead_framework.core.offspring_generator.abstract_mating.py.OffspringGenerator` :param population: {list<:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`>} set of solutions (parents) used to generate the offspring :return: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} offspring """ return self.offspring_generator.run(population_indexes=population)
[docs] def repair(self, solution): """ Repair the solution in parameter. :param solution: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} :return: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} the repaired solution """ return solution
[docs] def update_current_sub_problem(self, sub_problem): """ Update the attribute current_sub_problem :param sub_problem: {integer} index of sub-problem :return: """ self.current_sub_problem = sub_problem
[docs] def initial_population(self): """ Initialize the population of solution :return: {List<:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`>} """ p = [] for i in range(self.number_of_weight): x_i = self.problem.generate_random_solution() p.append(x_i) if not is_duplicated(x=x_i, population=self.ep, number_of_objective=self.number_of_objective): self.ep.append(x_i) self.ep = get_non_dominated(self.ep) return p
[docs] def init_z(self): """ Initialize the reference point z :return: """ z = np.zeros(self.number_of_objective) for i in range(self.number_of_weight): for j in range(self.number_of_objective): f_j = self.population[i].F[j] if z[j] > f_j: # in minimisation context ! z[j] = f_j return z
[docs] def update_z(self, solution): """ Update the reference point z with coordinates of the solution in parameter if coordinates are better. :param solution: :class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution` solution used to update the reference point :return: """ for i in range(self.number_of_objective): if self.z[i] > solution.F[i]: # in minimisation context ! self.z[i] = solution.F[i]
[docs] def generate_closest_weight_vectors(self): """ Generate all neighborhood for each solution in the population :return: {list<List<Integer>>} List of sub-problem (neighborhood) for each solution """ b = [] for i in range(self.number_of_weight): b_i = np.zeros(self.t, int) b_dist = np.full(self.t, np.inf) for j in range(self.number_of_weight): dist_wi_wj = np.linalg.norm(np.array(self.weights[i]) - np.array(self.weights[j])) if dist_wi_wj < np.max(b_dist): index_to_replace = np.argmax(b_dist) # replace the worst distance b_dist[index_to_replace] = dist_wi_wj b_i[index_to_replace] = j b.append(b_i.tolist()) return b