Source code for moead_framework.algorithm.combinatorial.moead_dra

import numpy as np
from moead_framework.algorithm.combinatorial.moead_delta_nr import MoeadDeltaNr
from moead_framework.core.sps_strategy.sps_dra import SpsDra


[docs]class MoeadDRA(MoeadDeltaNr): """ Implementation of MOEA/D-DRA Q. Zhang, W. Liu, and H. Li. The performance of a new version of moea/d on cec09 unconstrained mop test instances. In 2009 IEEE Congress on Evolutionary Computation, volume, 203–208. 2009. doi:10.1109/CEC.2009.4982949. Example: >>> from moead_framework.aggregation import Tchebycheff >>> from moead_framework.algorithm.combinatorial import MoeadDRA >>> from moead_framework.problem.combinatorial import Rmnk >>> >>> # The file is available here : https://github.com/moead-framework/data/blob/master/problem/RMNK/Instances/rmnk_0_2_100_1_0.dat >>> # Others instances are available here : https://github.com/moead-framework/data/tree/master/problem/RMNK/Instances >>> instance_file = "moead_framework/test/data/instances/rmnk_0_2_100_1_0.dat" >>> rmnk = Rmnk(instance_file=instance_file) >>> >>> number_of_weight = 10 >>> number_of_weight_neighborhood = 2 >>> number_of_evaluations = 1000 >>> # The file is available here : https://github.com/moead-framework/data/blob/master/weights/SOBOL-2objs-10wei.ws >>> # Others weights files are available here : https://github.com/moead-framework/data/tree/master/weights >>> weight_file = "moead_framework/test/data/weights/SOBOL-" + str(rmnk.number_of_objective) + "objs-" + str(number_of_weight) + "wei.ws" >>> >>> moead = MoeadDRA(problem=rmnk, >>> max_evaluation=number_of_evaluations, >>> delta=0.9, >>> number_of_replacement=1, >>> number_of_weight_neighborhood=number_of_weight_neighborhood, >>> weight_file=weight_file, >>> aggregation_function=Tchebycheff, >>> ) >>> >>> population = moead.run() """
[docs] def __init__(self, problem, max_evaluation, number_of_weight_neighborhood, delta, number_of_replacement, aggregation_function, weight_file, number_of_objective=None, number_of_crossover_points=None, threshold_before_evaluate_subproblem_utility=50, delta_threshold=0.001, ): """ Constructor of the algorithm. :param problem: {:class:`~moead_framework.problem.Problem`} problem to optimize :param max_evaluation: {integer} maximum number of evaluation :param number_of_weight_neighborhood: {integer} size of the neighborhood :param delta: {float} probability to use all the population as neighborhood :param number_of_replacement: {integer} maximum number of solutions replaced in the population for each new offspring generated :param aggregation_function: {:class:`~moead_framework.aggregation.functions.AggregationFunction`} :param weight_file: {string} path of the weight file. Each line represent a weight vector, each column represent a coordinate. An exemple is available here: https://github.com/moead-framework/data/blob/master/weights/SOBOL-2objs-10wei.ws :param number_of_crossover_points: {integer} number of crossover point :param threshold_before_evaluate_subproblem_utility: Optional -- Threshold before evaluate the subproblem utility. The default value is 50 :param delta_threshold: Optional -- reset the utility if the relative decrease delta_i is under this treshold. The default value is 0.001 :param number_of_objective: Deprecated -- {integer} number of objective in the problem. Deprecated, remove in the next major release. """ super().__init__(problem=problem, max_evaluation=max_evaluation, number_of_objective=number_of_objective, number_of_weight_neighborhood=number_of_weight_neighborhood, delta=delta, number_of_replacement=number_of_replacement, aggregation_function=aggregation_function, number_of_crossover_points=number_of_crossover_points, sps_strategy=SpsDra, weight_file=weight_file) self.pi = np.ones(self.number_of_weight) self.gen = 0 self.current_eval = 1 self.threshold_before_evaluate_subproblem_utility = threshold_before_evaluate_subproblem_utility self.delta_threshold = delta_threshold self.mating_pool = [] self.scores = [] for i in range(self.number_of_weight): self.scores.append([0, 0])
[docs] def run(self, checkpoint=None): """ Execute the algorithm. :param checkpoint: {function} The default value is None. The checkpoint can be used to save data during the process. The function required one parameter of type {:class:`~moead_framework.algorithm.abstract_moead.AbstractMoead`} :return: list<{:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}> All non-dominated solutions found by the algorithm Example: >>> from moead_framework.algorithm.combinatorial import Moead >>> from moead_framework.algorithm import AbstractMoead >>> from moead_framework.tool.result import save_population >>> moead = Moead(...) >>> >>> def checkpoint(moead_algorithm: AbstractMoead): >>> if moead_algorithm.current_eval % 10 == 0 : >>> filename = "non_dominated_solutions-eval" + str(moead_algorithm.current_eval) + ".txt" >>> save_population(file_name=filename, population=moead_algorithm.ep) >>> >>> population = moead.run(checkpoint) """ while self.current_eval < self.max_evaluation: for i in self.get_sub_problems_to_visit(): if checkpoint is not None: checkpoint(self) self.optimize_sub_problem(sub_problem_index=i) # update the score history of all sub_problem # just before compute the utility of sub problems if ((self.gen + 1) % self.threshold_before_evaluate_subproblem_utility == 0) | (self.gen == 0): all_sub_problems = list(range(self.number_of_weight)) for i in all_sub_problems: score = self.aggregation_function.run(solution=self.population[i], number_of_objective=self.number_of_objective, weights=self.weights, sub_problem=i, z=self.z) self.update_scores(sub_problem=i, score=score) self.gen += 1 if self.gen % self.threshold_before_evaluate_subproblem_utility == 0: self.update_pi() return self.ep
[docs] def update_scores(self, sub_problem, score): """ Update the score self.scores[sub_problem][0] = old score self.scores[sub_problem][1] = new score :param sub_problem: {integer} index of the current sub-problem :param score: {float} :return: """ self.scores[sub_problem][0] = self.scores[sub_problem][1] self.scores[sub_problem][1] = score
[docs] def compute_delta(self, i): """ compute the relative decrease delta_i :param i: {integer} index of sub-problem :return: """ old_value = self.scores[i][0] new_value = self.scores[i][1] if old_value == 0: old_value = new_value if new_value == 0: return 0 # when g return 0 return (old_value - new_value) / old_value
[docs] def update_pi(self): """ update the utility of each sub_problem :return: """ # for each sub_problem for i in range(self.number_of_weight): if self.gen % 50 == 0: delta_i = self.compute_delta(i=i) if delta_i > self.delta_threshold: self.pi[i] = 1 else: old_pi = self.pi[i] self.pi[i] = (0.95 + 0.05 * (delta_i / self.delta_threshold)) * old_pi