Source code for moead_framework.tool.result

from typing import List
import random
import numpy as np

from moead_framework.algorithm import AbstractMoead
from moead_framework.solution.base import Solution


[docs]def save_population(file_name, population): """ Save the objective values of each solution in the population :param file_name: string :param population: list<{:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}> :return: """ if type(population) != list: raise TypeError( f"the parameter 'population' of save_population(population) must be a list. Instead, we have {type(population)} ") file = open(file_name, "w") # file.write("Length of the list = " + str(len(population)) + "\n") for s in population: if not isinstance(s, Solution): file.close() raise TypeError( f"the parameter 'population' of save_population(population) must be a list of Solution. Instead, we have a list of {type(s)} ") row = "" for coordinate in s.F: row += str(coordinate) + " " row = row[:-1] + "\n" file.write(row) file.close()
[docs]def save_population_full(file_name, population): """ Save the objective vector and the decision vector of each solution in the population :param file_name: string :param population: list<{:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}> :return: """ if type(population) != list: raise TypeError( f"the parameter 'population' of save_population_full(population) must be a list. Instead, we have {type(population)} ") file = open(file_name, "w") # file.write("Length of the list = " + str(len(population)) + "\n") for s in population: if not isinstance(s, Solution): file.close() raise TypeError( f"the parameter 'population' of save_population_full(population) must be a list of Solution. Instead, we have a list of {type(s)} ") row = "" x = "" for coordinate in s.F: row += str(coordinate) + " " row += str(s.decision_vector.tolist()) row = row + "\n" file.write(row) file.close()
def set_seed(seed): """ Set the seed for numpy and random packages. Warning, this approach is not safe in threaded environments. By re-using a seed value, the same sequence should be reproducible from run to run as long as multiple threads are not running (https://docs.python.org/3/library/random.html#notes-on-reproducibility) :param seed: integer :return: """ random.seed(seed) np.random.seed(seed)
[docs]def checkpoint(moead_algorithm: AbstractMoead): """ Minimalist checkpoint function to save all non dominated point every 10 evaluations :param moead_algorithm: {:class:`~moead_framework.algorithm.abstract_moead.AbstractMoead`} instance of the algorithm that uses the checkpoint :return: """ if moead_algorithm.current_eval % 10 == 0 : filename = "non_dominated_solutions-eval" + str(moead_algorithm.current_eval) + ".txt" save_population(file_name=filename, population=moead_algorithm.ep)
[docs]def compute_hypervolume(solutions, ref_point): """ Compute the Hypervolume between the ref_point and objective values of each solution :param solutions: list<{:class:`~moead_framework.solution.one_dimension_solution.OneDimensionSolution`}> :param ref_point: list<float> :return: float : hypervolume value """ if type(solutions) != list: raise TypeError( f"the parameter 'solutions' of compute_hypervolume() must be a list. Instead, we have {type(solutions)} ") if type(ref_point) != list: raise TypeError( f"the parameter 'ref_point' of compute_hypervolume() must be a list. Instead, we have {type(solutions)} ") arr = [] for s in solutions: if isinstance(s, Solution): arr.append(s.F) else: raise TypeError( f"the parameter 'solutions' of compute_hypervolume must be a list of Solution. Instead, we have a list of {type(s)} ") if len(ref_point) != len(arr[0]): raise TypeError( f"the number of items in 'ref_point' must be equal to {len(arr[0])}. Instead, we have a list of size {len(ref_point)}.") hypervolume_class = HyperVolume(reference_point=ref_point) x = [] for s in solutions: x.append(s.F[:]) return hypervolume_class.compute(solutions=np.array(x))
class HyperVolume: """ Code from https://github.com/jMetal/jMetalPy/blob/master/jmetal/core/quality_indicator.py Hypervolume computation based on variant 3 of the algorithm in the paper: * C. M. Fonseca, L. Paquete, and M. Lopez-Ibanez. An improved dimension-sweep algorithm for the hypervolume indicator. In IEEE Congress on Evolutionary Computation, pages 1157-1163, Vancouver, Canada, July 2006. Minimization is implicitly assumed here! """ def __init__(self, reference_point: List[float]): self.referencePoint = reference_point self.list: MultiList = MultiList(0) def compute(self, solutions: List): """Before the HV computation, front and reference point are translated, so that the reference point is [0, ..., 0]. :return: The hypervolume that is dominated by a non-dominated front. """ front = solutions def weakly_dominates(point, other): for i in range(len(point)): if point[i] > other[i]: return False return True relevant_points = [] reference_point = self.referencePoint dimensions = len(reference_point) for point in front: # only consider points that dominate the reference point if weakly_dominates(point, reference_point): relevant_points.append(point) if any(reference_point): # shift points so that reference_point == [0, ..., 0] # this way the reference point doesn't have to be explicitly used # in the HV computation for j in range(len(relevant_points)): relevant_points[j] = [relevant_points[j][i] - reference_point[i] for i in range(dimensions)] self._pre_process(relevant_points) bounds = [-1.0e308] * dimensions return self._hv_recursive(dimensions - 1, len(relevant_points), bounds) def _hv_recursive(self, dim_index: int, length: int, bounds: list): """Recursive call to hypervolume calculation. In contrast to the paper, the code assumes that the reference point is [0, ..., 0]. This allows the avoidance of a few operations. """ hvol = 0.0 sentinel = self.list.sentinel if length == 0: return hvol elif dim_index == 0: # special case: only one dimension # why using hypervolume at all? return -sentinel.next[0].cargo[0] elif dim_index == 1: # special case: two dimensions, end recursion q = sentinel.next[1] h = q.cargo[0] p = q.next[1] while p is not sentinel: p_cargo = p.cargo hvol += h * (q.cargo[1] - p_cargo[1]) if p_cargo[0] < h: h = p_cargo[0] q = p p = q.next[1] hvol += h * q.cargo[1] return hvol else: remove = self.list.remove reinsert = self.list.reinsert hv_recursive = self._hv_recursive p = sentinel q = p.prev[dim_index] while q.cargo is not None: if q.ignore < dim_index: q.ignore = 0 q = q.prev[dim_index] q = p.prev[dim_index] while length > 1 and ( q.cargo[dim_index] > bounds[dim_index] or q.prev[dim_index].cargo[dim_index] >= bounds[dim_index]): p = q remove(p, dim_index, bounds) q = p.prev[dim_index] length -= 1 q_area = q.area q_cargo = q.cargo q_prev_dim_index = q.prev[dim_index] if length > 1: hvol = q_prev_dim_index.volume[dim_index] + q_prev_dim_index.area[dim_index] * ( q_cargo[dim_index] - q_prev_dim_index.cargo[dim_index]) else: q_area[0] = 1 q_area[1:dim_index + 1] = [q_area[i] * -q_cargo[i] for i in range(dim_index)] q.volume[dim_index] = hvol if q.ignore >= dim_index: q_area[dim_index] = q_prev_dim_index.area[dim_index] else: q_area[dim_index] = hv_recursive(dim_index - 1, length, bounds) if q_area[dim_index] <= q_prev_dim_index.area[dim_index]: q.ignore = dim_index while p is not sentinel: p_cargo_dim_index = p.cargo[dim_index] hvol += q.area[dim_index] * (p_cargo_dim_index - q.cargo[dim_index]) bounds[dim_index] = p_cargo_dim_index reinsert(p, dim_index, bounds) length += 1 q = p p = p.next[dim_index] q.volume[dim_index] = hvol if q.ignore >= dim_index: q.area[dim_index] = q.prev[dim_index].area[dim_index] else: q.area[dim_index] = hv_recursive(dim_index - 1, length, bounds) if q.area[dim_index] <= q.prev[dim_index].area[dim_index]: q.ignore = dim_index hvol -= q.area[dim_index] * q.cargo[dim_index] return hvol def _pre_process(self, front): """Sets up the list front structure needed for calculation.""" dimensions = len(self.referencePoint) node_list = MultiList(dimensions) nodes = [MultiList.Node(dimensions, point) for point in front] for i in range(dimensions): self._sort_by_dimension(nodes, i) node_list.extend(nodes, i) self.list = node_list def _sort_by_dimension(self, nodes, i): """Sorts the list of nodes by the i-th value of the contained points.""" # build a list of tuples of (point[i], node) decorated = [(node.cargo[i], node) for node in nodes] # sort by this value decorated.sort(key=lambda n: n[0]) # write back to original list nodes[:] = [node for (_, node) in decorated] class MultiList: """A special front structure needed by FonsecaHyperVolume. It consists of several doubly linked lists that share common nodes. So, every node has multiple predecessors and successors, one in every list. """ class Node: def __init__(self, number_lists, cargo=None): self.cargo = cargo self.next = [None] * number_lists self.prev = [None] * number_lists self.ignore = 0 self.area = [0.0] * number_lists self.volume = [0.0] * number_lists def __init__(self, number_lists): """ Builds 'numberLists' doubly linked lists. """ self.number_lists = number_lists self.sentinel = MultiList.Node(number_lists) self.sentinel.next = [self.sentinel] * number_lists self.sentinel.prev = [self.sentinel] * number_lists def __len__(self): """Returns the number of lists that are included in this MultiList.""" return self.number_lists def append(self, node, index): """ Appends a node to the end of the list at the given index.""" last_but_one = self.sentinel.prev[index] node.next[index] = self.sentinel node.prev[index] = last_but_one # set the last element as the new one self.sentinel.prev[index] = node last_but_one.next[index] = node def extend(self, nodes, index): """ Extends the list at the given index with the nodes.""" sentinel = self.sentinel for node in nodes: last_but_one = sentinel.prev[index] node.next[index] = sentinel node.prev[index] = last_but_one # set the last element as the new one sentinel.prev[index] = node last_but_one.next[index] = node def remove(self, node, index, bounds): """ Removes and returns 'node' from all lists in [0, 'index'[.""" for i in range(index): predecessor = node.prev[i] successor = node.next[i] predecessor.next[i] = successor successor.prev[i] = predecessor if bounds[i] > node.cargo[i]: bounds[i] = node.cargo[i] return node def reinsert(self, node, index, bounds): """ Inserts 'node' at the position it had in all lists in [0, 'index'[ before it was removed. This method assumes that the next and previous nodes of the node that is reinserted are in the list. """ for i in range(index): node.prev[i].next[i] = node node.next[i].prev[i] = node if bounds[i] > node.cargo[i]: bounds[i] = node.cargo[i]