ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

dbscan.py

(4067B)


      1 from __future__ import print_function, division
      2 import numpy as np
      3 from mlfromscratch.utils import Plot, euclidean_distance, normalize
      4 
      5 
      6 class DBSCAN():
      7     """A density based clustering method that expands clusters from 
      8     samples that have more neighbors within a radius specified by eps
      9     than the value min_samples.
     10 
     11     Parameters:
     12     -----------
     13     eps: float
     14         The radius within which samples are considered neighbors
     15     min_samples: int
     16         The number of neighbors required for the sample to be a core point. 
     17     """
     18     def __init__(self, eps=1, min_samples=5):
     19         self.eps = eps
     20         self.min_samples = min_samples
     21 
     22     def _get_neighbors(self, sample_i):
     23         """ Return a list of indexes of neighboring samples
     24         A sample_2 is considered a neighbor of sample_1 if the distance between
     25         them is smaller than epsilon """
     26         neighbors = []
     27         idxs = np.arange(len(self.X))
     28         for i, _sample in enumerate(self.X[idxs != sample_i]):
     29             distance = euclidean_distance(self.X[sample_i], _sample)
     30             if distance < self.eps:
     31                 neighbors.append(i)
     32         return np.array(neighbors)
     33 
     34     def _expand_cluster(self, sample_i, neighbors):
     35         """ Recursive method which expands the cluster until we have reached the border
     36         of the dense area (density determined by eps and min_samples) """
     37         cluster = [sample_i]
     38         # Iterate through neighbors
     39         for neighbor_i in neighbors:
     40             if not neighbor_i in self.visited_samples:
     41                 self.visited_samples.append(neighbor_i)
     42                 # Fetch the sample's distant neighbors (neighbors of neighbor)
     43                 self.neighbors[neighbor_i] = self._get_neighbors(neighbor_i)
     44                 # Make sure the neighbor's neighbors are more than min_samples
     45                 # (If this is true the neighbor is a core point)
     46                 if len(self.neighbors[neighbor_i]) >= self.min_samples:
     47                     # Expand the cluster from the neighbor
     48                     expanded_cluster = self._expand_cluster(
     49                         neighbor_i, self.neighbors[neighbor_i])
     50                     # Add expanded cluster to this cluster
     51                     cluster = cluster + expanded_cluster
     52                 else:
     53                     # If the neighbor is not a core point we only add the neighbor point
     54                     cluster.append(neighbor_i)
     55         return cluster
     56 
     57     def _get_cluster_labels(self):
     58         """ Return the samples labels as the index of the cluster in which they are
     59         contained """
     60         # Set default value to number of clusters
     61         # Will make sure all outliers have same cluster label
     62         labels = np.full(shape=self.X.shape[0], fill_value=len(self.clusters))
     63         for cluster_i, cluster in enumerate(self.clusters):
     64             for sample_i in cluster:
     65                 labels[sample_i] = cluster_i
     66         return labels
     67 
     68     # DBSCAN
     69     def predict(self, X):
     70         self.X = X
     71         self.clusters = []
     72         self.visited_samples = []
     73         self.neighbors = {}
     74         n_samples = np.shape(self.X)[0]
     75         # Iterate through samples and expand clusters from them
     76         # if they have more neighbors than self.min_samples
     77         for sample_i in range(n_samples):
     78             if sample_i in self.visited_samples:
     79                 continue
     80             self.neighbors[sample_i] = self._get_neighbors(sample_i)
     81             if len(self.neighbors[sample_i]) >= self.min_samples:
     82                 # If core point => mark as visited
     83                 self.visited_samples.append(sample_i)
     84                 # Sample has more neighbors than self.min_samples => expand
     85                 # cluster from sample
     86                 new_cluster = self._expand_cluster(
     87                     sample_i, self.neighbors[sample_i])
     88                 # Add cluster to list of clusters
     89                 self.clusters.append(new_cluster)
     90 
     91         # Get the resulting cluster labels
     92         cluster_labels = self._get_cluster_labels()
     93         return cluster_labels