ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
decision_tree.py
(11134B)
1 from __future__ import division, print_function
2 import numpy as np
3
4 from mlfromscratch.utils import divide_on_feature, train_test_split, standardize, mean_squared_error
5 from mlfromscratch.utils import calculate_entropy, accuracy_score, calculate_variance
6
7 class DecisionNode():
8 """Class that represents a decision node or leaf in the decision tree
9
10 Parameters:
11 -----------
12 feature_i: int
13 Feature index which we want to use as the threshold measure.
14 threshold: float
15 The value that we will compare feature values at feature_i against to
16 determine the prediction.
17 value: float
18 The class prediction if classification tree, or float value if regression tree.
19 true_branch: DecisionNode
20 Next decision node for samples where features value met the threshold.
21 false_branch: DecisionNode
22 Next decision node for samples where features value did not meet the threshold.
23 """
24 def __init__(self, feature_i=None, threshold=None,
25 value=None, true_branch=None, false_branch=None):
26 self.feature_i = feature_i # Index for the feature that is tested
27 self.threshold = threshold # Threshold value for feature
28 self.value = value # Value if the node is a leaf in the tree
29 self.true_branch = true_branch # 'Left' subtree
30 self.false_branch = false_branch # 'Right' subtree
31
32
33 # Super class of RegressionTree and ClassificationTree
34 class DecisionTree(object):
35 """Super class of RegressionTree and ClassificationTree.
36
37 Parameters:
38 -----------
39 min_samples_split: int
40 The minimum number of samples needed to make a split when building a tree.
41 min_impurity: float
42 The minimum impurity required to split the tree further.
43 max_depth: int
44 The maximum depth of a tree.
45 loss: function
46 Loss function that is used for Gradient Boosting models to calculate impurity.
47 """
48 def __init__(self, min_samples_split=2, min_impurity=1e-7,
49 max_depth=float("inf"), loss=None):
50 self.root = None # Root node in dec. tree
51 # Minimum n of samples to justify split
52 self.min_samples_split = min_samples_split
53 # The minimum impurity to justify split
54 self.min_impurity = min_impurity
55 # The maximum depth to grow the tree to
56 self.max_depth = max_depth
57 # Function to calculate impurity (classif.=>info gain, regr=>variance reduct.)
58 self._impurity_calculation = None
59 # Function to determine prediction of y at leaf
60 self._leaf_value_calculation = None
61 # If y is one-hot encoded (multi-dim) or not (one-dim)
62 self.one_dim = None
63 # If Gradient Boost
64 self.loss = loss
65
66 def fit(self, X, y, loss=None):
67 """ Build decision tree """
68 self.one_dim = len(np.shape(y)) == 1
69 self.root = self._build_tree(X, y)
70 self.loss=None
71
72 def _build_tree(self, X, y, current_depth=0):
73 """ Recursive method which builds out the decision tree and splits X and respective y
74 on the feature of X which (based on impurity) best separates the data"""
75
76 largest_impurity = 0
77 best_criteria = None # Feature index and threshold
78 best_sets = None # Subsets of the data
79
80 # Check if expansion of y is needed
81 if len(np.shape(y)) == 1:
82 y = np.expand_dims(y, axis=1)
83
84 # Add y as last column of X
85 Xy = np.concatenate((X, y), axis=1)
86
87 n_samples, n_features = np.shape(X)
88
89 if n_samples >= self.min_samples_split and current_depth <= self.max_depth:
90 # Calculate the impurity for each feature
91 for feature_i in range(n_features):
92 # All values of feature_i
93 feature_values = np.expand_dims(X[:, feature_i], axis=1)
94 unique_values = np.unique(feature_values)
95
96 # Iterate through all unique values of feature column i and
97 # calculate the impurity
98 for threshold in unique_values:
99 # Divide X and y depending on if the feature value of X at index feature_i
100 # meets the threshold
101 Xy1, Xy2 = divide_on_feature(Xy, feature_i, threshold)
102
103 if len(Xy1) > 0 and len(Xy2) > 0:
104 # Select the y-values of the two sets
105 y1 = Xy1[:, n_features:]
106 y2 = Xy2[:, n_features:]
107
108 # Calculate impurity
109 impurity = self._impurity_calculation(y, y1, y2)
110
111 # If this threshold resulted in a higher information gain than previously
112 # recorded save the threshold value and the feature
113 # index
114 if impurity > largest_impurity:
115 largest_impurity = impurity
116 best_criteria = {"feature_i": feature_i, "threshold": threshold}
117 best_sets = {
118 "leftX": Xy1[:, :n_features], # X of left subtree
119 "lefty": Xy1[:, n_features:], # y of left subtree
120 "rightX": Xy2[:, :n_features], # X of right subtree
121 "righty": Xy2[:, n_features:] # y of right subtree
122 }
123
124 if largest_impurity > self.min_impurity:
125 # Build subtrees for the right and left branches
126 true_branch = self._build_tree(best_sets["leftX"], best_sets["lefty"], current_depth + 1)
127 false_branch = self._build_tree(best_sets["rightX"], best_sets["righty"], current_depth + 1)
128 return DecisionNode(feature_i=best_criteria["feature_i"], threshold=best_criteria[
129 "threshold"], true_branch=true_branch, false_branch=false_branch)
130
131 # We're at leaf => determine value
132 leaf_value = self._leaf_value_calculation(y)
133
134 return DecisionNode(value=leaf_value)
135
136
137 def predict_value(self, x, tree=None):
138 """ Do a recursive search down the tree and make a prediction of the data sample by the
139 value of the leaf that we end up at """
140
141 if tree is None:
142 tree = self.root
143
144 # If we have a value (i.e we're at a leaf) => return value as the prediction
145 if tree.value is not None:
146 return tree.value
147
148 # Choose the feature that we will test
149 feature_value = x[tree.feature_i]
150
151 # Determine if we will follow left or right branch
152 branch = tree.false_branch
153 if isinstance(feature_value, int) or isinstance(feature_value, float):
154 if feature_value >= tree.threshold:
155 branch = tree.true_branch
156 elif feature_value == tree.threshold:
157 branch = tree.true_branch
158
159 # Test subtree
160 return self.predict_value(x, branch)
161
162 def predict(self, X):
163 """ Classify samples one by one and return the set of labels """
164 y_pred = [self.predict_value(sample) for sample in X]
165 return y_pred
166
167 def print_tree(self, tree=None, indent=" "):
168 """ Recursively print the decision tree """
169 if not tree:
170 tree = self.root
171
172 # If we're at leaf => print the label
173 if tree.value is not None:
174 print (tree.value)
175 # Go deeper down the tree
176 else:
177 # Print test
178 print ("%s:%s? " % (tree.feature_i, tree.threshold))
179 # Print the true scenario
180 print ("%sT->" % (indent), end="")
181 self.print_tree(tree.true_branch, indent + indent)
182 # Print the false scenario
183 print ("%sF->" % (indent), end="")
184 self.print_tree(tree.false_branch, indent + indent)
185
186
187
188 class XGBoostRegressionTree(DecisionTree):
189 """
190 Regression tree for XGBoost
191 - Reference -
192 http://xgboost.readthedocs.io/en/latest/model.html
193 """
194
195 def _split(self, y):
196 """ y contains y_true in left half of the middle column and
197 y_pred in the right half. Split and return the two matrices """
198 col = int(np.shape(y)[1]/2)
199 y, y_pred = y[:, :col], y[:, col:]
200 return y, y_pred
201
202 def _gain(self, y, y_pred):
203 nominator = np.power((y * self.loss.gradient(y, y_pred)).sum(), 2)
204 denominator = self.loss.hess(y, y_pred).sum()
205 return 0.5 * (nominator / denominator)
206
207 def _gain_by_taylor(self, y, y1, y2):
208 # Split
209 y, y_pred = self._split(y)
210 y1, y1_pred = self._split(y1)
211 y2, y2_pred = self._split(y2)
212
213 true_gain = self._gain(y1, y1_pred)
214 false_gain = self._gain(y2, y2_pred)
215 gain = self._gain(y, y_pred)
216 return true_gain + false_gain - gain
217
218 def _approximate_update(self, y):
219 # y split into y, y_pred
220 y, y_pred = self._split(y)
221 # Newton's Method
222 gradient = np.sum(y * self.loss.gradient(y, y_pred), axis=0)
223 hessian = np.sum(self.loss.hess(y, y_pred), axis=0)
224 update_approximation = gradient / hessian
225
226 return update_approximation
227
228 def fit(self, X, y):
229 self._impurity_calculation = self._gain_by_taylor
230 self._leaf_value_calculation = self._approximate_update
231 super(XGBoostRegressionTree, self).fit(X, y)
232
233
234 class RegressionTree(DecisionTree):
235 def _calculate_variance_reduction(self, y, y1, y2):
236 var_tot = calculate_variance(y)
237 var_1 = calculate_variance(y1)
238 var_2 = calculate_variance(y2)
239 frac_1 = len(y1) / len(y)
240 frac_2 = len(y2) / len(y)
241
242 # Calculate the variance reduction
243 variance_reduction = var_tot - (frac_1 * var_1 + frac_2 * var_2)
244
245 return sum(variance_reduction)
246
247 def _mean_of_y(self, y):
248 value = np.mean(y, axis=0)
249 return value if len(value) > 1 else value[0]
250
251 def fit(self, X, y):
252 self._impurity_calculation = self._calculate_variance_reduction
253 self._leaf_value_calculation = self._mean_of_y
254 super(RegressionTree, self).fit(X, y)
255
256 class ClassificationTree(DecisionTree):
257 def _calculate_information_gain(self, y, y1, y2):
258 # Calculate information gain
259 p = len(y1) / len(y)
260 entropy = calculate_entropy(y)
261 info_gain = entropy - p * \
262 calculate_entropy(y1) - (1 - p) * \
263 calculate_entropy(y2)
264
265 return info_gain
266
267 def _majority_vote(self, y):
268 most_common = None
269 max_count = 0
270 for label in np.unique(y):
271 # Count number of occurences of samples with label
272 count = len(y[y == label])
273 if count > max_count:
274 most_common = label
275 max_count = count
276 return most_common
277
278 def fit(self, X, y):
279 self._impurity_calculation = self._calculate_information_gain
280 self._leaf_value_calculation = self._majority_vote
281 super(ClassificationTree, self).fit(X, y)