ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
gbm_utils.py
(3521B)
1 #!/usr/bin/env python
2 # -*- coding: utf-8 -*-
3 __author__ = 'Stefan Jansen'
4
5 from pathlib import Path
6 import numpy as np
7 import pandas as pd
8
9 pd.set_option('display.expand_frame_repr', False)
10 np.random.seed(42)
11
12 """
13 This file contains several helper functions
14 used to prepare the data and run cross-validation
15 """
16
17 DATA_STORE = Path('../data/assets.h5')
18
19 def format_time(t):
20 """Return a formatted time string 'HH:MM:SS
21 based on a numeric time() value"""
22 m, s = divmod(t, 60)
23 h, m = divmod(m, 60)
24 return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>2.0f}'
25
26
27 def get_data(start='2000', end='2018', holding_period=1, dropna=False):
28 """Load dataset created in Chapter 4"""
29 idx = pd.IndexSlice
30 target = f'target_{holding_period}m'
31 with pd.HDFStore(DATA_STORE) as store:
32 df = store['engineered_features']
33
34 if start is not None and end is not None:
35 df = df.loc[idx[:, start: end], :]
36 if dropna:
37 df = df.dropna()
38
39 y = (df[target] > 0).astype(int)
40 X = df.drop([c for c in df.columns if c.startswith('target')], axis=1)
41 return y, X
42
43
44 def get_one_hot_data(df, cols=('year', 'month', 'age', 'msize')):
45 cols = list(cols)
46 df = pd.get_dummies(df,
47 columns=cols + ['sector'],
48 prefix=cols + [''],
49 prefix_sep=['_'] * len(cols) + [''])
50 return df.rename(columns={c: c.replace('.0', '').replace(' ', '_').lower() for c in df.columns})
51
52
53 def factorize_cats(df, cats=['sector']):
54 cat_cols = ['year', 'month', 'age', 'msize'] + cats
55 for cat in cats:
56 df[cat] = pd.factorize(df[cat])[0]
57 df.loc[:, cat_cols] = df.loc[:, cat_cols].fillna(-1).astype('category')
58 return df
59
60
61 def get_holdout_set(target, features, period=6):
62 idx = pd.IndexSlice
63 label = target.name
64 dates = np.sort(target.index.get_level_values('date').unique())
65 cv_start, cv_end = dates[0], dates[-period - 2]
66 holdout_start, holdout_end = dates[-period - 1], dates[-1]
67
68 df = features.join(target.to_frame())
69 train = df.loc[idx[:, cv_start: cv_end], :]
70 y_train, X_train = train[label], train.drop(label, axis=1)
71
72 test = df.loc[idx[:, holdout_start: holdout_end], :]
73 y_test, X_test = test[label], test.drop(label, axis=1)
74 return y_train, X_train, y_test, X_test
75
76
77 class OneStepTimeSeriesSplit:
78 """Generates tuples of train_idx, test_idx pairs
79 Assumes the index contains a level labeled 'date'"""
80
81 def __init__(self, n_splits=3, test_period_length=1, shuffle=False):
82 self.n_splits = n_splits
83 self.test_period_length = test_period_length
84 self.shuffle = shuffle
85 self.test_end = n_splits * test_period_length
86
87 @staticmethod
88 def chunks(l, chunk_size):
89 for i in range(0, len(l), chunk_size):
90 yield l[i:i + chunk_size]
91
92 def split(self, X, y=None, groups=None):
93 unique_dates = (X.index
94 .get_level_values('date')
95 .unique()
96 .sort_values(ascending=False)[:self.test_end])
97
98 dates = X.reset_index()[['date']]
99 for test_date in self.chunks(unique_dates, self.test_period_length):
100 train_idx = dates[dates.date < min(test_date)].index
101 test_idx = dates[dates.date.isin(test_date)].index
102 if self.shuffle:
103 np.random.shuffle(list(train_idx))
104 yield train_idx, test_idx
105
106 def get_n_splits(self, X, y, groups=None):
107 return self.n_splits