ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

gaussian_mixture_model.py

(4723B)


      1 from __future__ import division, print_function
      2 import math
      3 from sklearn import datasets
      4 import numpy as np
      5 
      6 from mlfromscratch.utils import normalize, euclidean_distance, calculate_covariance_matrix
      7 from mlfromscratch.utils import Plot
      8 
      9 
     10 class GaussianMixtureModel():
     11     """A probabilistic clustering method for determining groupings among data samples.
     12 
     13     Parameters:
     14     -----------
     15     k: int
     16         The number of clusters the algorithm will form.
     17     max_iterations: int
     18         The number of iterations the algorithm will run for if it does
     19         not converge before that. 
     20     tolerance: float
     21         If the difference of the results from one iteration to the next is
     22         smaller than this value we will say that the algorithm has converged.
     23     """
     24     def __init__(self, k=2, max_iterations=2000, tolerance=1e-8):
     25         self.k = k
     26         self.parameters = []
     27         self.max_iterations = max_iterations
     28         self.tolerance = tolerance
     29         self.responsibilities = []
     30         self.sample_assignments = None
     31         self.responsibility = None
     32 
     33     def _init_random_gaussians(self, X):
     34         """ Initialize gaussian randomly """
     35         n_samples = np.shape(X)[0]
     36         self.priors = (1 / self.k) * np.ones(self.k)
     37         for i in range(self.k):
     38             params = {}
     39             params["mean"] = X[np.random.choice(range(n_samples))]
     40             params["cov"] = calculate_covariance_matrix(X)
     41             self.parameters.append(params)
     42 
     43     def multivariate_gaussian(self, X, params):
     44         """ Likelihood """
     45         n_features = np.shape(X)[1]
     46         mean = params["mean"]
     47         covar = params["cov"]
     48         determinant = np.linalg.det(covar)
     49         likelihoods = np.zeros(np.shape(X)[0])
     50         for i, sample in enumerate(X):
     51             d = n_features  # dimension
     52             coeff = (1.0 / (math.pow((2.0 * math.pi), d / 2)
     53                             * math.sqrt(determinant)))
     54             exponent = math.exp(-0.5 * (sample - mean).T.dot(np.linalg.pinv(covar)).dot((sample - mean)))
     55             likelihoods[i] = coeff * exponent
     56 
     57         return likelihoods
     58 
     59     def _get_likelihoods(self, X):
     60         """ Calculate the likelihood over all samples """
     61         n_samples = np.shape(X)[0]
     62         likelihoods = np.zeros((n_samples, self.k))
     63         for i in range(self.k):
     64             likelihoods[
     65                 :, i] = self.multivariate_gaussian(
     66                 X, self.parameters[i])
     67         return likelihoods
     68 
     69     def _expectation(self, X):
     70         """ Calculate the responsibility """
     71         # Calculate probabilities of X belonging to the different clusters
     72         weighted_likelihoods = self._get_likelihoods(X) * self.priors
     73         sum_likelihoods = np.expand_dims(
     74             np.sum(weighted_likelihoods, axis=1), axis=1)
     75         # Determine responsibility as P(X|y)*P(y)/P(X)
     76         self.responsibility = weighted_likelihoods / sum_likelihoods
     77         # Assign samples to cluster that has largest probability
     78         self.sample_assignments = self.responsibility.argmax(axis=1)
     79         # Save value for convergence check
     80         self.responsibilities.append(np.max(self.responsibility, axis=1))
     81 
     82     def _maximization(self, X):
     83         """ Update the parameters and priors """
     84         # Iterate through clusters and recalculate mean and covariance
     85         for i in range(self.k):
     86             resp = np.expand_dims(self.responsibility[:, i], axis=1)
     87             mean = (resp * X).sum(axis=0) / resp.sum()
     88             covariance = (X - mean).T.dot((X - mean) * resp) / resp.sum()
     89             self.parameters[i]["mean"], self.parameters[
     90                 i]["cov"] = mean, covariance
     91 
     92         # Update weights
     93         n_samples = np.shape(X)[0]
     94         self.priors = self.responsibility.sum(axis=0) / n_samples
     95 
     96     def _converged(self, X):
     97         """ Covergence if || likehood - last_likelihood || < tolerance """
     98         if len(self.responsibilities) < 2:
     99             return False
    100         diff = np.linalg.norm(
    101             self.responsibilities[-1] - self.responsibilities[-2])
    102         # print ("Likelihood update: %s (tol: %s)" % (diff, self.tolerance))
    103         return diff <= self.tolerance
    104 
    105     def predict(self, X):
    106         """ Run GMM and return the cluster indices """
    107         # Initialize the gaussians randomly
    108         self._init_random_gaussians(X)
    109 
    110         # Run EM until convergence or for max iterations
    111         for _ in range(self.max_iterations):
    112             self._expectation(X)    # E-step
    113             self._maximization(X)   # M-step
    114 
    115             # Check convergence
    116             if self._converged(X):
    117                 break
    118 
    119         # Make new assignments and return them
    120         self._expectation(X)
    121         return self.sample_assignments