import random
import numpy as np
from moead_framework.aggregation.functions import AggregationFunction
from moead_framework.core.genetic_operator import GeneticOperator
from moead_framework.core.offspring_generator.abstract_mating import OffspringGenerator
from moead_framework.core.offspring_generator.offspring_generator import OffspringGeneratorGeneric
from moead_framework.core.parent_selector.abstract_parent_selector import ParentSelector
from moead_framework.core.selector.abstract_selector import MatingPoolSelector
from moead_framework.core.selector.closest_neighbors_selector import ClosestNeighborsSelector
from moead_framework.core.sps_strategy.abstract_sps import SpsStrategy
from moead_framework.core.sps_strategy.sps_all import SpsAllSubproblems
from moead_framework.core.termination_criteria.abstract_termination_criteria import TerminationCriteria
from moead_framework.core.termination_criteria.max_evaluation import MaxEvaluation
from moead_framework.problem import Problem
from moead_framework.tool.mop import is_duplicated, get_non_dominated, generate_weight_vectors
[docs]class AbstractMoead:
"""
Abstract class to implement a new algorithm based on MOEA/D in the framework
https://moead-framework.github.io/framework/html/tuto.html#implement-your-own-algorithm
"""
[docs] def __init__(self, problem, max_evaluation, number_of_weight_neighborhood,
aggregation_function, weight_file,
termination_criteria=None,
genetic_operator=None,
parent_selector=None,
mating_pool_selector=None,
sps_strategy=None,
offspring_generator=None,
number_of_weight=None,
number_of_objective=None,
):
"""
Constructor of the algorithm.
:param problem: {:class:`~moead_framework.problem.Problem`} problem to optimize
:param max_evaluation: {integer} maximum number of evaluation
:param number_of_weight_neighborhood: {integer} size of the neighborhood.
:param aggregation_function: {:class:`~moead_framework.aggregation.functions.AggregationFunction`}
:param weight_file: {string} path of the weight file. Each line represent a weight vector, each column represent a coordinate. An exemple is available here: https://github.com/moead-framework/data/blob/master/weights/SOBOL-2objs-10wei.ws
:param termination_criteria: Optional -- {:class:`~moead_framework.core.termination_criteria.abstract_termination_criteria.TerminationCriteria`} The default component is {:class:`~moead_framework.core.termination_criteria.max_evaluation.MaxEvaluation`}
:param genetic_operator: Optional -- {:class:`~moead_framework.core.genetic_operator.abstract_operator.GeneticOperator`} The default operator depends of the problem type (combinatorial / numerical)
:param parent_selector: Optional -- {:class:`~moead_framework.core.parent_selector.abstract_parent_selector.ParentSelector`} The default operator depends of the number of solution required by the genetic operator
:param mating_pool_selector: Optional -- {:class:`~moead_framework.core.selector.abstract_selector.MatingPoolSelector`} The default selector is {:class:`~moead_framework.core.selector.closest_neighbors_selector.ClosestNeighborsSelector`}
:param sps_strategy: Optional -- {:class:`~moead_framework.core.sps_strategy.abstract_sps.SpsStrategy`} The default strategy is {:class:`~moead_framework.core.sps_strategy.sps_all.SpsAllSubproblems`}
:param offspring_generator: Optional -- {:class:`~moead_framework.core.offspring_generator.abstract_mating.OffspringGenerator`} The default generator is {:class:`~moead_framework.core.offspring_generator.offspring_generator.OffspringGeneratorGeneric`}
:param number_of_weight: Deprecated -- {integer} number of weight vector used to decompose the problem. Deprecated, remove in the next major release.
:param number_of_objective: Deprecated -- {integer} number of objective in the problem. Deprecated, remove in the next major release.
"""
if isinstance(problem, Problem):
self.problem = problem
else:
raise TypeError("The expected type of `problem` is `Problem`")
if not isinstance(weight_file, str):
raise TypeError("The expected type of `weight_file` is `Str`")
if issubclass(aggregation_function, AggregationFunction):
self.aggregation_function = aggregation_function()
else:
raise TypeError("The expected type of `aggregation_function` is `AggregationFunction`")
if number_of_weight is not None:
import warnings
warnings.warn("The attribute number_of_weight is deprecated", DeprecationWarning)
if number_of_objective is not None:
import warnings
warnings.warn("The attribute number_of_objective is deprecated", DeprecationWarning)
if termination_criteria is None:
self.termination_criteria = MaxEvaluation(algorithm_instance=self)
else:
if issubclass(termination_criteria, TerminationCriteria):
self.termination_criteria = termination_criteria(algorithm_instance=self)
else:
raise TypeError("The expected type of `termination_criteria` is `TerminationCriteria`")
if isinstance(max_evaluation, int):
self.max_evaluation = max_evaluation
else:
raise TypeError("The expected type of `max_evaluation` is `int`")
self.number_of_objective = self.problem.number_of_objective
if isinstance(number_of_weight_neighborhood, int):
self.t = number_of_weight_neighborhood
else:
raise TypeError("The expected type of `number_of_weight_neighborhood` is `int`")
self.ep = []
self.weights = generate_weight_vectors(weight_file, shuffle=False)
self.number_of_weight = len(self.weights)
self.population = self.initial_population()
random.shuffle(self.weights)
self.z = self.init_z()
self.b = self.generate_closest_weight_vectors()
self.current_sub_problem = -1
self.current_eval = 1
self.mating_pool = []
if sps_strategy is None:
self.sps_strategy = SpsAllSubproblems(algorithm_instance=self)
else:
if issubclass(sps_strategy, SpsStrategy):
self.sps_strategy = sps_strategy(algorithm_instance=self)
else:
raise TypeError("The expected type of `sps_strategy` is `SpsStrategy`")
if mating_pool_selector is None:
self.mating_pool_selector = ClosestNeighborsSelector(algorithm_instance=self)
else:
if issubclass(mating_pool_selector, MatingPoolSelector):
self.mating_pool_selector = mating_pool_selector(algorithm_instance=self)
else:
raise TypeError("The expected type of `mating_pool_selector` is `MatingPoolSelector`")
if issubclass(genetic_operator, GeneticOperator):
self.genetic_operator = genetic_operator
else:
raise TypeError("The expected type of `genetic_operator` is `GeneticOperator`")
if issubclass(parent_selector, ParentSelector):
self.parent_selector = parent_selector(algorithm=self)
else:
raise TypeError("The expected type of `parent_selector` is `ParentSelector`")
if offspring_generator is None:
self.offspring_generator = OffspringGeneratorGeneric(algorithm_instance=self)
else:
if issubclass(offspring_generator, OffspringGenerator):
self.offspring_generator = offspring_generator(algorithm_instance=self)
else:
raise TypeError("The expected type of `offspring_generator` is `OffspringGenerator`")
[docs] def run(self, checkpoint=None):
"""
Execute the algorithm.
:param checkpoint: {function} The default value is None. The checkpoint can be used to save data during the process. The function required one parameter of type {:class:`~moead_framework.algorithm.abstract_moead.AbstractMoead`}
:return: list<{:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}> All non-dominated solutions found by the algorithm
Example:
>>> from moead_framework.algorithm.combinatorial import Moead
>>> from moead_framework.tool.result import save_population
>>> moead = Moead(...)
>>>
>>> def checkpoint(moead_algorithm: AbstractMoead):
>>> if moead_algorithm.current_eval % 10 == 0 :
>>> filename = "non_dominated_solutions-eval" + str(moead_algorithm.current_eval) + ".txt"
>>> save_population(file_name=filename, population=moead_algorithm.ep)
>>>
>>> population = moead.run(checkpoint)
"""
while self.termination_criteria.test():
# For each sub-problem i
for i in self.get_sub_problems_to_visit():
if checkpoint is not None:
checkpoint(self)
self.optimize_sub_problem(sub_problem_index=i)
return self.ep
[docs] def optimize_sub_problem(self, sub_problem_index):
"""
Execute the process to optimize the sub-problem currently iterated
:param sub_problem_index: {integer} index of the sub-problem iterated
:return:
"""
self.update_current_sub_problem(sub_problem=sub_problem_index)
self.mating_pool = self.mating_pool_selection(sub_problem=sub_problem_index)[:]
y = self.generate_offspring(population=self.mating_pool)
y = self.repair(solution=y)
self.update_z(solution=y)
self.update_solutions(solution=y, aggregation_function=self.aggregation_function, sub_problem=sub_problem_index)
self.current_eval += 1
[docs] def update_solutions(self, solution, aggregation_function, sub_problem):
"""
Update solutions of the population and of the external archive ep
:param solution: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} the candidate solution also called offspring
:param aggregation_function: {:class:`~moead_framework.aggregation.functions.AggregationFunction`} Aggregation function used to compare solution in a multi-objective context
:param sub_problem: {integer} index of the sub-problem currently visited
:return:
"""
for j in self.b[sub_problem]:
y_score = aggregation_function.run(solution=solution,
number_of_objective=self.number_of_objective,
weights=self.weights,
sub_problem=j,
z=self.z)
pop_j_score = aggregation_function.run(solution=self.population[j],
number_of_objective=self.number_of_objective,
weights=self.weights,
sub_problem=j,
z=self.z)
if aggregation_function.is_better(pop_j_score, y_score):
self.population[j] = solution
if not is_duplicated(x=solution, population=self.ep, number_of_objective=self.number_of_objective):
self.ep.append(solution)
self.ep = get_non_dominated(self.ep)
[docs] def get_sub_problems_to_visit(self):
"""
Select sub-problems to visit for the next generation.
This function calls the component :class:`~moead_framework.core.sps_strategy.abstract_sps.SpsStrategy`
:return: {list} indexes of sub-problems
"""
return self.sps_strategy.get_sub_problems()
[docs] def mating_pool_selection(self, sub_problem):
"""
Select the set of solutions where future parents solutions will be selected according to the current sub-problem visited
:param sub_problem: {integer} index of the sub-problem currently visited
:return: {list} indexes of sub-problems
"""
return self.mating_pool_selector.select(sub_problem)
[docs] def generate_offspring(self, population):
"""
Generate a new offspring.
This function calls the component :class:`~moead_framework.core.offspring_generator.abstract_mating.py.OffspringGenerator`
:param population: {list<:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`>} set of solutions (parents) used to generate the offspring
:return: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} offspring
"""
return self.offspring_generator.run(population_indexes=population)
[docs] def repair(self, solution):
"""
Repair the solution in parameter.
:param solution: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}
:return: {:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`} the repaired solution
"""
return solution
[docs] def update_current_sub_problem(self, sub_problem):
"""
Update the attribute current_sub_problem
:param sub_problem: {integer} index of sub-problem
:return:
"""
self.current_sub_problem = sub_problem
[docs] def initial_population(self):
"""
Initialize the population of solution
:return: {List<:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`>}
"""
p = []
for i in range(self.number_of_weight):
x_i = self.problem.generate_random_solution()
p.append(x_i)
if not is_duplicated(x=x_i, population=self.ep, number_of_objective=self.number_of_objective):
self.ep.append(x_i)
self.ep = get_non_dominated(self.ep)
return p
[docs] def init_z(self):
"""
Initialize the reference point z
:return:
"""
z = np.zeros(self.number_of_objective)
for i in range(self.number_of_weight):
for j in range(self.number_of_objective):
f_j = self.population[i].F[j]
if z[j] > f_j: # in minimisation context !
z[j] = f_j
return z
[docs] def update_z(self, solution):
"""
Update the reference point z with coordinates of the solution in parameter if coordinates are better.
:param solution: :class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution` solution used to update the reference point
:return:
"""
for i in range(self.number_of_objective):
if self.z[i] > solution.F[i]: # in minimisation context !
self.z[i] = solution.F[i]
[docs] def generate_closest_weight_vectors(self):
"""
Generate all neighborhood for each solution in the population
:return: {list<List<Integer>>} List of sub-problem (neighborhood) for each solution
"""
b = []
for i in range(self.number_of_weight):
b_i = np.zeros(self.t, int)
b_dist = np.full(self.t, np.inf)
for j in range(self.number_of_weight):
dist_wi_wj = np.linalg.norm(np.array(self.weights[i]) - np.array(self.weights[j]))
if dist_wi_wj < np.max(b_dist):
index_to_replace = np.argmax(b_dist) # replace the worst distance
b_dist[index_to_replace] = dist_wi_wj
b_i[index_to_replace] = j
b.append(b_i.tolist())
return b