ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

gbm_utils.py

(3521B)


      1 #!/usr/bin/env python
      2 # -*- coding: utf-8 -*-
      3 __author__ = 'Stefan Jansen'
      4 
      5 from pathlib import Path
      6 import numpy as np
      7 import pandas as pd
      8 
      9 pd.set_option('display.expand_frame_repr', False)
     10 np.random.seed(42)
     11 
     12 """
     13 This file contains several helper functions 
     14 used to prepare the data and run cross-validation
     15 """
     16 
     17 DATA_STORE = Path('../data/assets.h5')
     18 
     19 def format_time(t):
     20     """Return a formatted time string 'HH:MM:SS
     21     based on a numeric time() value"""
     22     m, s = divmod(t, 60)
     23     h, m = divmod(m, 60)
     24     return f'{h:0>2.0f}:{m:0>2.0f}:{s:0>2.0f}'
     25 
     26 
     27 def get_data(start='2000', end='2018', holding_period=1, dropna=False):
     28     """Load dataset created in Chapter 4"""
     29     idx = pd.IndexSlice
     30     target = f'target_{holding_period}m'
     31     with pd.HDFStore(DATA_STORE) as store:
     32         df = store['engineered_features']
     33 
     34     if start is not None and end is not None:
     35         df = df.loc[idx[:, start: end], :]
     36     if dropna:
     37         df = df.dropna()
     38 
     39     y = (df[target] > 0).astype(int)
     40     X = df.drop([c for c in df.columns if c.startswith('target')], axis=1)
     41     return y, X
     42 
     43 
     44 def get_one_hot_data(df, cols=('year', 'month', 'age', 'msize')):
     45     cols = list(cols)
     46     df = pd.get_dummies(df,
     47                         columns=cols + ['sector'],
     48                         prefix=cols + [''],
     49                         prefix_sep=['_'] * len(cols) + [''])
     50     return df.rename(columns={c: c.replace('.0', '').replace(' ', '_').lower() for c in df.columns})
     51 
     52 
     53 def factorize_cats(df, cats=['sector']):
     54     cat_cols = ['year', 'month', 'age', 'msize'] + cats
     55     for cat in cats:
     56         df[cat] = pd.factorize(df[cat])[0]
     57     df.loc[:, cat_cols] = df.loc[:, cat_cols].fillna(-1).astype('category')
     58     return df
     59 
     60 
     61 def get_holdout_set(target, features, period=6):
     62     idx = pd.IndexSlice
     63     label = target.name
     64     dates = np.sort(target.index.get_level_values('date').unique())
     65     cv_start, cv_end = dates[0], dates[-period - 2]
     66     holdout_start, holdout_end = dates[-period - 1], dates[-1]
     67 
     68     df = features.join(target.to_frame())
     69     train = df.loc[idx[:, cv_start: cv_end], :]
     70     y_train, X_train = train[label], train.drop(label, axis=1)
     71 
     72     test = df.loc[idx[:, holdout_start: holdout_end], :]
     73     y_test, X_test = test[label], test.drop(label, axis=1)
     74     return y_train, X_train, y_test, X_test
     75 
     76 
     77 class OneStepTimeSeriesSplit:
     78     """Generates tuples of train_idx, test_idx pairs
     79     Assumes the index contains a level labeled 'date'"""
     80 
     81     def __init__(self, n_splits=3, test_period_length=1, shuffle=False):
     82         self.n_splits = n_splits
     83         self.test_period_length = test_period_length
     84         self.shuffle = shuffle
     85         self.test_end = n_splits * test_period_length
     86 
     87     @staticmethod
     88     def chunks(l, chunk_size):
     89         for i in range(0, len(l), chunk_size):
     90             yield l[i:i + chunk_size]
     91 
     92     def split(self, X, y=None, groups=None):
     93         unique_dates = (X.index
     94                             .get_level_values('date')
     95                             .unique()
     96                             .sort_values(ascending=False)[:self.test_end])
     97 
     98         dates = X.reset_index()[['date']]
     99         for test_date in self.chunks(unique_dates, self.test_period_length):
    100             train_idx = dates[dates.date < min(test_date)].index
    101             test_idx = dates[dates.date.isin(test_date)].index
    102             if self.shuffle:
    103                 np.random.shuffle(list(train_idx))
    104             yield train_idx, test_idx
    105 
    106     def get_n_splits(self, X, y, groups=None):
    107         return self.n_splits