ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
gaussian_mixture_model.py
(4723B)
1 from __future__ import division, print_function
2 import math
3 from sklearn import datasets
4 import numpy as np
5
6 from mlfromscratch.utils import normalize, euclidean_distance, calculate_covariance_matrix
7 from mlfromscratch.utils import Plot
8
9
10 class GaussianMixtureModel():
11 """A probabilistic clustering method for determining groupings among data samples.
12
13 Parameters:
14 -----------
15 k: int
16 The number of clusters the algorithm will form.
17 max_iterations: int
18 The number of iterations the algorithm will run for if it does
19 not converge before that.
20 tolerance: float
21 If the difference of the results from one iteration to the next is
22 smaller than this value we will say that the algorithm has converged.
23 """
24 def __init__(self, k=2, max_iterations=2000, tolerance=1e-8):
25 self.k = k
26 self.parameters = []
27 self.max_iterations = max_iterations
28 self.tolerance = tolerance
29 self.responsibilities = []
30 self.sample_assignments = None
31 self.responsibility = None
32
33 def _init_random_gaussians(self, X):
34 """ Initialize gaussian randomly """
35 n_samples = np.shape(X)[0]
36 self.priors = (1 / self.k) * np.ones(self.k)
37 for i in range(self.k):
38 params = {}
39 params["mean"] = X[np.random.choice(range(n_samples))]
40 params["cov"] = calculate_covariance_matrix(X)
41 self.parameters.append(params)
42
43 def multivariate_gaussian(self, X, params):
44 """ Likelihood """
45 n_features = np.shape(X)[1]
46 mean = params["mean"]
47 covar = params["cov"]
48 determinant = np.linalg.det(covar)
49 likelihoods = np.zeros(np.shape(X)[0])
50 for i, sample in enumerate(X):
51 d = n_features # dimension
52 coeff = (1.0 / (math.pow((2.0 * math.pi), d / 2)
53 * math.sqrt(determinant)))
54 exponent = math.exp(-0.5 * (sample - mean).T.dot(np.linalg.pinv(covar)).dot((sample - mean)))
55 likelihoods[i] = coeff * exponent
56
57 return likelihoods
58
59 def _get_likelihoods(self, X):
60 """ Calculate the likelihood over all samples """
61 n_samples = np.shape(X)[0]
62 likelihoods = np.zeros((n_samples, self.k))
63 for i in range(self.k):
64 likelihoods[
65 :, i] = self.multivariate_gaussian(
66 X, self.parameters[i])
67 return likelihoods
68
69 def _expectation(self, X):
70 """ Calculate the responsibility """
71 # Calculate probabilities of X belonging to the different clusters
72 weighted_likelihoods = self._get_likelihoods(X) * self.priors
73 sum_likelihoods = np.expand_dims(
74 np.sum(weighted_likelihoods, axis=1), axis=1)
75 # Determine responsibility as P(X|y)*P(y)/P(X)
76 self.responsibility = weighted_likelihoods / sum_likelihoods
77 # Assign samples to cluster that has largest probability
78 self.sample_assignments = self.responsibility.argmax(axis=1)
79 # Save value for convergence check
80 self.responsibilities.append(np.max(self.responsibility, axis=1))
81
82 def _maximization(self, X):
83 """ Update the parameters and priors """
84 # Iterate through clusters and recalculate mean and covariance
85 for i in range(self.k):
86 resp = np.expand_dims(self.responsibility[:, i], axis=1)
87 mean = (resp * X).sum(axis=0) / resp.sum()
88 covariance = (X - mean).T.dot((X - mean) * resp) / resp.sum()
89 self.parameters[i]["mean"], self.parameters[
90 i]["cov"] = mean, covariance
91
92 # Update weights
93 n_samples = np.shape(X)[0]
94 self.priors = self.responsibility.sum(axis=0) / n_samples
95
96 def _converged(self, X):
97 """ Covergence if || likehood - last_likelihood || < tolerance """
98 if len(self.responsibilities) < 2:
99 return False
100 diff = np.linalg.norm(
101 self.responsibilities[-1] - self.responsibilities[-2])
102 # print ("Likelihood update: %s (tol: %s)" % (diff, self.tolerance))
103 return diff <= self.tolerance
104
105 def predict(self, X):
106 """ Run GMM and return the cluster indices """
107 # Initialize the gaussians randomly
108 self._init_random_gaussians(X)
109
110 # Run EM until convergence or for max iterations
111 for _ in range(self.max_iterations):
112 self._expectation(X) # E-step
113 self._maximization(X) # M-step
114
115 # Check convergence
116 if self._converged(X):
117 break
118
119 # Make new assignments and return them
120 self._expectation(X)
121 return self.sample_assignments