ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
dbscan.py
(4067B)
1 from __future__ import print_function, division
2 import numpy as np
3 from mlfromscratch.utils import Plot, euclidean_distance, normalize
4
5
6 class DBSCAN():
7 """A density based clustering method that expands clusters from
8 samples that have more neighbors within a radius specified by eps
9 than the value min_samples.
10
11 Parameters:
12 -----------
13 eps: float
14 The radius within which samples are considered neighbors
15 min_samples: int
16 The number of neighbors required for the sample to be a core point.
17 """
18 def __init__(self, eps=1, min_samples=5):
19 self.eps = eps
20 self.min_samples = min_samples
21
22 def _get_neighbors(self, sample_i):
23 """ Return a list of indexes of neighboring samples
24 A sample_2 is considered a neighbor of sample_1 if the distance between
25 them is smaller than epsilon """
26 neighbors = []
27 idxs = np.arange(len(self.X))
28 for i, _sample in enumerate(self.X[idxs != sample_i]):
29 distance = euclidean_distance(self.X[sample_i], _sample)
30 if distance < self.eps:
31 neighbors.append(i)
32 return np.array(neighbors)
33
34 def _expand_cluster(self, sample_i, neighbors):
35 """ Recursive method which expands the cluster until we have reached the border
36 of the dense area (density determined by eps and min_samples) """
37 cluster = [sample_i]
38 # Iterate through neighbors
39 for neighbor_i in neighbors:
40 if not neighbor_i in self.visited_samples:
41 self.visited_samples.append(neighbor_i)
42 # Fetch the sample's distant neighbors (neighbors of neighbor)
43 self.neighbors[neighbor_i] = self._get_neighbors(neighbor_i)
44 # Make sure the neighbor's neighbors are more than min_samples
45 # (If this is true the neighbor is a core point)
46 if len(self.neighbors[neighbor_i]) >= self.min_samples:
47 # Expand the cluster from the neighbor
48 expanded_cluster = self._expand_cluster(
49 neighbor_i, self.neighbors[neighbor_i])
50 # Add expanded cluster to this cluster
51 cluster = cluster + expanded_cluster
52 else:
53 # If the neighbor is not a core point we only add the neighbor point
54 cluster.append(neighbor_i)
55 return cluster
56
57 def _get_cluster_labels(self):
58 """ Return the samples labels as the index of the cluster in which they are
59 contained """
60 # Set default value to number of clusters
61 # Will make sure all outliers have same cluster label
62 labels = np.full(shape=self.X.shape[0], fill_value=len(self.clusters))
63 for cluster_i, cluster in enumerate(self.clusters):
64 for sample_i in cluster:
65 labels[sample_i] = cluster_i
66 return labels
67
68 # DBSCAN
69 def predict(self, X):
70 self.X = X
71 self.clusters = []
72 self.visited_samples = []
73 self.neighbors = {}
74 n_samples = np.shape(self.X)[0]
75 # Iterate through samples and expand clusters from them
76 # if they have more neighbors than self.min_samples
77 for sample_i in range(n_samples):
78 if sample_i in self.visited_samples:
79 continue
80 self.neighbors[sample_i] = self._get_neighbors(sample_i)
81 if len(self.neighbors[sample_i]) >= self.min_samples:
82 # If core point => mark as visited
83 self.visited_samples.append(sample_i)
84 # Sample has more neighbors than self.min_samples => expand
85 # cluster from sample
86 new_cluster = self._expand_cluster(
87 sample_i, self.neighbors[sample_i])
88 # Add cluster to list of clusters
89 self.clusters.append(new_cluster)
90
91 # Get the resulting cluster labels
92 cluster_labels = self._get_cluster_labels()
93 return cluster_labels