ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

decision_tree.py

(11134B)


      1 from __future__ import division, print_function
      2 import numpy as np
      3 
      4 from mlfromscratch.utils import divide_on_feature, train_test_split, standardize, mean_squared_error
      5 from mlfromscratch.utils import calculate_entropy, accuracy_score, calculate_variance
      6 
      7 class DecisionNode():
      8     """Class that represents a decision node or leaf in the decision tree
      9 
     10     Parameters:
     11     -----------
     12     feature_i: int
     13         Feature index which we want to use as the threshold measure.
     14     threshold: float
     15         The value that we will compare feature values at feature_i against to
     16         determine the prediction.
     17     value: float
     18         The class prediction if classification tree, or float value if regression tree.
     19     true_branch: DecisionNode
     20         Next decision node for samples where features value met the threshold.
     21     false_branch: DecisionNode
     22         Next decision node for samples where features value did not meet the threshold.
     23     """
     24     def __init__(self, feature_i=None, threshold=None,
     25                  value=None, true_branch=None, false_branch=None):
     26         self.feature_i = feature_i          # Index for the feature that is tested
     27         self.threshold = threshold          # Threshold value for feature
     28         self.value = value                  # Value if the node is a leaf in the tree
     29         self.true_branch = true_branch      # 'Left' subtree
     30         self.false_branch = false_branch    # 'Right' subtree
     31 
     32 
     33 # Super class of RegressionTree and ClassificationTree
     34 class DecisionTree(object):
     35     """Super class of RegressionTree and ClassificationTree.
     36 
     37     Parameters:
     38     -----------
     39     min_samples_split: int
     40         The minimum number of samples needed to make a split when building a tree.
     41     min_impurity: float
     42         The minimum impurity required to split the tree further.
     43     max_depth: int
     44         The maximum depth of a tree.
     45     loss: function
     46         Loss function that is used for Gradient Boosting models to calculate impurity.
     47     """
     48     def __init__(self, min_samples_split=2, min_impurity=1e-7,
     49                  max_depth=float("inf"), loss=None):
     50         self.root = None  # Root node in dec. tree
     51         # Minimum n of samples to justify split
     52         self.min_samples_split = min_samples_split
     53         # The minimum impurity to justify split
     54         self.min_impurity = min_impurity
     55         # The maximum depth to grow the tree to
     56         self.max_depth = max_depth
     57         # Function to calculate impurity (classif.=>info gain, regr=>variance reduct.)
     58         self._impurity_calculation = None
     59         # Function to determine prediction of y at leaf
     60         self._leaf_value_calculation = None
     61         # If y is one-hot encoded (multi-dim) or not (one-dim)
     62         self.one_dim = None
     63         # If Gradient Boost
     64         self.loss = loss
     65 
     66     def fit(self, X, y, loss=None):
     67         """ Build decision tree """
     68         self.one_dim = len(np.shape(y)) == 1
     69         self.root = self._build_tree(X, y)
     70         self.loss=None
     71 
     72     def _build_tree(self, X, y, current_depth=0):
     73         """ Recursive method which builds out the decision tree and splits X and respective y
     74         on the feature of X which (based on impurity) best separates the data"""
     75 
     76         largest_impurity = 0
     77         best_criteria = None    # Feature index and threshold
     78         best_sets = None        # Subsets of the data
     79 
     80         # Check if expansion of y is needed
     81         if len(np.shape(y)) == 1:
     82             y = np.expand_dims(y, axis=1)
     83 
     84         # Add y as last column of X
     85         Xy = np.concatenate((X, y), axis=1)
     86 
     87         n_samples, n_features = np.shape(X)
     88 
     89         if n_samples >= self.min_samples_split and current_depth <= self.max_depth:
     90             # Calculate the impurity for each feature
     91             for feature_i in range(n_features):
     92                 # All values of feature_i
     93                 feature_values = np.expand_dims(X[:, feature_i], axis=1)
     94                 unique_values = np.unique(feature_values)
     95 
     96                 # Iterate through all unique values of feature column i and
     97                 # calculate the impurity
     98                 for threshold in unique_values:
     99                     # Divide X and y depending on if the feature value of X at index feature_i
    100                     # meets the threshold
    101                     Xy1, Xy2 = divide_on_feature(Xy, feature_i, threshold)
    102 
    103                     if len(Xy1) > 0 and len(Xy2) > 0:
    104                         # Select the y-values of the two sets
    105                         y1 = Xy1[:, n_features:]
    106                         y2 = Xy2[:, n_features:]
    107 
    108                         # Calculate impurity
    109                         impurity = self._impurity_calculation(y, y1, y2)
    110 
    111                         # If this threshold resulted in a higher information gain than previously
    112                         # recorded save the threshold value and the feature
    113                         # index
    114                         if impurity > largest_impurity:
    115                             largest_impurity = impurity
    116                             best_criteria = {"feature_i": feature_i, "threshold": threshold}
    117                             best_sets = {
    118                                 "leftX": Xy1[:, :n_features],   # X of left subtree
    119                                 "lefty": Xy1[:, n_features:],   # y of left subtree
    120                                 "rightX": Xy2[:, :n_features],  # X of right subtree
    121                                 "righty": Xy2[:, n_features:]   # y of right subtree
    122                                 }
    123 
    124         if largest_impurity > self.min_impurity:
    125             # Build subtrees for the right and left branches
    126             true_branch = self._build_tree(best_sets["leftX"], best_sets["lefty"], current_depth + 1)
    127             false_branch = self._build_tree(best_sets["rightX"], best_sets["righty"], current_depth + 1)
    128             return DecisionNode(feature_i=best_criteria["feature_i"], threshold=best_criteria[
    129                                 "threshold"], true_branch=true_branch, false_branch=false_branch)
    130 
    131         # We're at leaf => determine value
    132         leaf_value = self._leaf_value_calculation(y)
    133 
    134         return DecisionNode(value=leaf_value)
    135 
    136 
    137     def predict_value(self, x, tree=None):
    138         """ Do a recursive search down the tree and make a prediction of the data sample by the
    139             value of the leaf that we end up at """
    140 
    141         if tree is None:
    142             tree = self.root
    143 
    144         # If we have a value (i.e we're at a leaf) => return value as the prediction
    145         if tree.value is not None:
    146             return tree.value
    147 
    148         # Choose the feature that we will test
    149         feature_value = x[tree.feature_i]
    150 
    151         # Determine if we will follow left or right branch
    152         branch = tree.false_branch
    153         if isinstance(feature_value, int) or isinstance(feature_value, float):
    154             if feature_value >= tree.threshold:
    155                 branch = tree.true_branch
    156         elif feature_value == tree.threshold:
    157             branch = tree.true_branch
    158 
    159         # Test subtree
    160         return self.predict_value(x, branch)
    161 
    162     def predict(self, X):
    163         """ Classify samples one by one and return the set of labels """
    164         y_pred = [self.predict_value(sample) for sample in X]
    165         return y_pred
    166 
    167     def print_tree(self, tree=None, indent=" "):
    168         """ Recursively print the decision tree """
    169         if not tree:
    170             tree = self.root
    171 
    172         # If we're at leaf => print the label
    173         if tree.value is not None:
    174             print (tree.value)
    175         # Go deeper down the tree
    176         else:
    177             # Print test
    178             print ("%s:%s? " % (tree.feature_i, tree.threshold))
    179             # Print the true scenario
    180             print ("%sT->" % (indent), end="")
    181             self.print_tree(tree.true_branch, indent + indent)
    182             # Print the false scenario
    183             print ("%sF->" % (indent), end="")
    184             self.print_tree(tree.false_branch, indent + indent)
    185 
    186 
    187 
    188 class XGBoostRegressionTree(DecisionTree):
    189     """
    190     Regression tree for XGBoost
    191     - Reference -
    192     http://xgboost.readthedocs.io/en/latest/model.html
    193     """
    194 
    195     def _split(self, y):
    196         """ y contains y_true in left half of the middle column and
    197         y_pred in the right half. Split and return the two matrices """
    198         col = int(np.shape(y)[1]/2)
    199         y, y_pred = y[:, :col], y[:, col:]
    200         return y, y_pred
    201 
    202     def _gain(self, y, y_pred):
    203         nominator = np.power((y * self.loss.gradient(y, y_pred)).sum(), 2)
    204         denominator = self.loss.hess(y, y_pred).sum()
    205         return 0.5 * (nominator / denominator)
    206 
    207     def _gain_by_taylor(self, y, y1, y2):
    208         # Split
    209         y, y_pred = self._split(y)
    210         y1, y1_pred = self._split(y1)
    211         y2, y2_pred = self._split(y2)
    212 
    213         true_gain = self._gain(y1, y1_pred)
    214         false_gain = self._gain(y2, y2_pred)
    215         gain = self._gain(y, y_pred)
    216         return true_gain + false_gain - gain
    217 
    218     def _approximate_update(self, y):
    219         # y split into y, y_pred
    220         y, y_pred = self._split(y)
    221         # Newton's Method
    222         gradient = np.sum(y * self.loss.gradient(y, y_pred), axis=0)
    223         hessian = np.sum(self.loss.hess(y, y_pred), axis=0)
    224         update_approximation =  gradient / hessian
    225 
    226         return update_approximation
    227 
    228     def fit(self, X, y):
    229         self._impurity_calculation = self._gain_by_taylor
    230         self._leaf_value_calculation = self._approximate_update
    231         super(XGBoostRegressionTree, self).fit(X, y)
    232 
    233 
    234 class RegressionTree(DecisionTree):
    235     def _calculate_variance_reduction(self, y, y1, y2):
    236         var_tot = calculate_variance(y)
    237         var_1 = calculate_variance(y1)
    238         var_2 = calculate_variance(y2)
    239         frac_1 = len(y1) / len(y)
    240         frac_2 = len(y2) / len(y)
    241 
    242         # Calculate the variance reduction
    243         variance_reduction = var_tot - (frac_1 * var_1 + frac_2 * var_2)
    244 
    245         return sum(variance_reduction)
    246 
    247     def _mean_of_y(self, y):
    248         value = np.mean(y, axis=0)
    249         return value if len(value) > 1 else value[0]
    250 
    251     def fit(self, X, y):
    252         self._impurity_calculation = self._calculate_variance_reduction
    253         self._leaf_value_calculation = self._mean_of_y
    254         super(RegressionTree, self).fit(X, y)
    255 
    256 class ClassificationTree(DecisionTree):
    257     def _calculate_information_gain(self, y, y1, y2):
    258         # Calculate information gain
    259         p = len(y1) / len(y)
    260         entropy = calculate_entropy(y)
    261         info_gain = entropy - p * \
    262             calculate_entropy(y1) - (1 - p) * \
    263             calculate_entropy(y2)
    264 
    265         return info_gain
    266 
    267     def _majority_vote(self, y):
    268         most_common = None
    269         max_count = 0
    270         for label in np.unique(y):
    271             # Count number of occurences of samples with label
    272             count = len(y[y == label])
    273             if count > max_count:
    274                 most_common = label
    275                 max_count = count
    276         return most_common
    277 
    278     def fit(self, X, y):
    279         self._impurity_calculation = self._calculate_information_gain
    280         self._leaf_value_calculation = self._majority_vote
    281         super(ClassificationTree, self).fit(X, y)