ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

particle_swarm_optimization.py

(5985B)


      1 from __future__ import print_function, division
      2 import numpy as np
      3 import copy
      4 
      5 class ParticleSwarmOptimizedNN():
      6     """ Particle Swarm Optimization of Neural Network.
      7 
      8     Parameters:
      9     -----------
     10     n_individuals: int
     11         The number of neural networks that are allowed in the population at a time.
     12     model_builder: method
     13         A method which returns a user specified NeuralNetwork instance.
     14     inertia_weight:     float [0,1)
     15     cognitive_weight:   float [0,1)
     16     social_weight:      float [0,1)
     17     max_velocity: float
     18         The maximum allowed value for the velocity.
     19 
     20     Reference:
     21         Neural Network Training Using Particle Swarm Optimization
     22         https://visualstudiomagazine.com/articles/2013/12/01/neural-network-training-using-particle-swarm-optimization.aspx 
     23     """
     24     def __init__(self, population_size, 
     25                         model_builder, 
     26                         inertia_weight=0.8, 
     27                         cognitive_weight=2, 
     28                         social_weight=2, 
     29                         max_velocity=20):
     30         self.population_size = population_size
     31         self.model_builder = model_builder
     32         self.best_individual = None
     33         # Parameters used to update velocity
     34         self.cognitive_w = cognitive_weight
     35         self.inertia_w = inertia_weight
     36         self.social_w = social_weight
     37         self.min_v = -max_velocity
     38         self.max_v = max_velocity
     39 
     40     def _build_model(self, id):
     41         """ Returns a new individual """
     42         model = self.model_builder(n_inputs=self.X.shape[1], n_outputs=self.y.shape[1])
     43         model.id = id
     44         model.fitness = 0
     45         model.highest_fitness = 0
     46         model.accuracy = 0
     47         # Set intial best as the current initialization
     48         model.best_layers = copy.copy(model.layers)
     49 
     50         # Set initial velocity to zero
     51         model.velocity = []
     52         for layer in model.layers:
     53             velocity = {"W": 0, "w0": 0}
     54             if hasattr(layer, 'W'):
     55                 velocity = {"W": np.zeros_like(layer.W), "w0": np.zeros_like(layer.w0)}
     56             model.velocity.append(velocity)
     57 
     58         return model
     59 
     60     def _initialize_population(self):
     61         """ Initialization of the neural networks forming the population"""
     62         self.population = []
     63         for i in range(self.population_size):
     64             model = self._build_model(id=i)
     65             self.population.append(model)
     66 
     67     def _update_weights(self, individual):
     68         """ Calculate the new velocity and update weights for each layer """
     69         # Two random parameters used to update the velocity
     70         r1 = np.random.uniform()
     71         r2 = np.random.uniform()
     72         for i, layer in enumerate(individual.layers):
     73             if hasattr(layer, 'W'):
     74                 # Layer weights velocity
     75                 first_term_W = self.inertia_w * individual.velocity[i]["W"]
     76                 second_term_W = self.cognitive_w * r1 * (individual.best_layers[i].W - layer.W)
     77                 third_term_W = self.social_w * r2 * (self.best_individual.layers[i].W - layer.W)
     78                 new_velocity = first_term_W + second_term_W + third_term_W
     79                 individual.velocity[i]["W"] = np.clip(new_velocity, self.min_v, self.max_v)
     80 
     81                 # Bias weight velocity
     82                 first_term_w0 = self.inertia_w * individual.velocity[i]["w0"]
     83                 second_term_w0 = self.cognitive_w * r1 * (individual.best_layers[i].w0 - layer.w0)
     84                 third_term_w0 = self.social_w * r2 * (self.best_individual.layers[i].w0 - layer.w0)
     85                 new_velocity = first_term_w0 + second_term_w0 + third_term_w0
     86                 individual.velocity[i]["w0"] = np.clip(new_velocity, self.min_v, self.max_v)
     87 
     88                 # Update layer weights with velocity
     89                 individual.layers[i].W += individual.velocity[i]["W"]
     90                 individual.layers[i].w0 += individual.velocity[i]["w0"]
     91         
     92     def _calculate_fitness(self, individual):
     93         """ Evaluate the individual on the test set to get fitness scores """
     94         loss, acc = individual.test_on_batch(self.X, self.y)
     95         individual.fitness = 1 / (loss + 1e-8)
     96         individual.accuracy = acc
     97 
     98     def evolve(self, X, y, n_generations):
     99         """ Will evolve the population for n_generations based on dataset X and labels y"""
    100         self.X, self.y = X, y
    101 
    102         self._initialize_population()
    103 
    104         # The best individual of the population is initialized as population's first ind.
    105         self.best_individual = copy.copy(self.population[0])
    106 
    107         for epoch in range(n_generations):
    108             for individual in self.population:
    109                 # Calculate new velocity and update the NN weights
    110                 self._update_weights(individual)
    111                 # Calculate the fitness of the updated individual
    112                 self._calculate_fitness(individual)
    113 
    114                 # If the current fitness is higher than the individual's previous highest
    115                 # => update the individual's best layer setup
    116                 if individual.fitness > individual.highest_fitness:
    117                     individual.best_layers = copy.copy(individual.layers)
    118                     individual.highest_fitness = individual.fitness
    119                 # If the individual's fitness is higher than the highest recorded fitness for the
    120                 # whole population => update the best individual
    121                 if individual.fitness > self.best_individual.fitness:
    122                     self.best_individual = copy.copy(individual)
    123 
    124             print ("[%d Best Individual - ID: %d Fitness: %.5f, Accuracy: %.1f%%]" % (epoch,
    125                                                                             self.best_individual.id,
    126                                                                             self.best_individual.fitness,
    127                                                                             100*float(self.best_individual.accuracy)))
    128         return self.best_individual
    129