ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
edhec_risk_kit_204.py
(22309B)
1 import pandas as pd
2 import numpy as np
3
4 def get_ffme_returns():
5 """
6 Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
7 """
8 me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
9 header=0, index_col=0, na_values=-99.99)
10 rets = me_m[['Lo 10', 'Hi 10']]
11 rets.columns = ['SmallCap', 'LargeCap']
12 rets = rets/100
13 rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
14 return rets
15
16 def get_fff_returns():
17 """
18 Load the Fama-French Research Factor Monthly Dataset
19 """
20 rets = pd.read_csv("data/F-F_Research_Data_Factors_m.csv",
21 header=0, index_col=0, na_values=-99.99)/100
22 rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
23 return rets
24
25
26 def get_hfi_returns():
27 """
28 Load and format the EDHEC Hedge Fund Index Returns
29 """
30 hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
31 header=0, index_col=0, parse_dates=True)
32 hfi = hfi/100
33 hfi.index = hfi.index.to_period('M')
34 return hfi
35
36 def get_ind_file(filetype, weighting="vw", n_inds=30):
37 """
38 Load and format the Ken French Industry Portfolios files
39 Variant is a tuple of (weighting, size) where:
40 weighting is one of "ew", "vw"
41 number of inds is 30 or 49
42 """
43 if filetype is "returns":
44 name = f"{weighting}_rets"
45 divisor = 100
46 elif filetype is "nfirms":
47 name = "nfirms"
48 divisor = 1
49 elif filetype is "size":
50 name = "size"
51 divisor = 1
52 else:
53 raise ValueError(f"filetype must be one of: returns, nfirms, size")
54
55 ind = pd.read_csv(f"data/ind{n_inds}_m_{name}.csv", header=0, index_col=0, na_values=-99.99)/divisor
56 ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
57 ind.columns = ind.columns.str.strip()
58 return ind
59
60 def get_ind_returns(weighting="vw", n_inds=30):
61 """
62 Load and format the Ken French Industry Portfolios Monthly Returns
63 """
64 return get_ind_file("returns", weighting=weighting, n_inds=n_inds)
65
66 def get_ind_nfirms(n_inds=30):
67 """
68 Load and format the Ken French 30 Industry Portfolios Average number of Firms
69 """
70 return get_ind_file("nfirms", n_inds=n_inds)
71
72 def get_ind_size(n_inds=30):
73 """
74 Load and format the Ken French 30 Industry Portfolios Average size (market cap)
75 """
76 return get_ind_file("size", n_inds=n_inds)
77
78
79 def get_ind_market_caps(n_inds=30, weights=False):
80 """
81 Load the industry portfolio data and derive the market caps
82 """
83 ind_nfirms = get_ind_nfirms(n_inds=n_inds)
84 ind_size = get_ind_size(n_inds=n_inds)
85 ind_mktcap = ind_nfirms * ind_size
86 if weights:
87 total_mktcap = ind_mktcap.sum(axis=1)
88 ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
89 return ind_capweight
90 #else
91 return ind_mktcap
92
93 def get_total_market_index_returns(n_inds=30):
94 """
95 Load the 30 industry portfolio data and derive the returns of a capweighted total market index
96 """
97 ind_capweight = get_ind_market_caps(n_inds=n_inds)
98 ind_return = get_ind_returns(weighting="vw", n_inds=n_inds)
99 total_market_return = (ind_capweight * ind_return).sum(axis="columns")
100 return total_market_return
101
102 def skewness(r):
103 """
104 Alternative to scipy.stats.skew()
105 Computes the skewness of the supplied Series or DataFrame
106 Returns a float or a Series
107 """
108 demeaned_r = r - r.mean()
109 # use the population standard deviation, so set dof=0
110 sigma_r = r.std(ddof=0)
111 exp = (demeaned_r**3).mean()
112 return exp/sigma_r**3
113
114
115 def kurtosis(r):
116 """
117 Alternative to scipy.stats.kurtosis()
118 Computes the kurtosis of the supplied Series or DataFrame
119 Returns a float or a Series
120 """
121 demeaned_r = r - r.mean()
122 # use the population standard deviation, so set dof=0
123 sigma_r = r.std(ddof=0)
124 exp = (demeaned_r**4).mean()
125 return exp/sigma_r**4
126
127
128 def compound(r):
129 """
130 returns the result of compounding the set of returns in r
131 """
132 return np.expm1(np.log1p(r).sum())
133
134
135 def annualize_rets(r, periods_per_year):
136 """
137 Annualizes a set of returns
138 We should infer the periods per year
139 but that is currently left as an exercise
140 to the reader :-)
141 """
142 compounded_growth = (1+r).prod()
143 n_periods = r.shape[0]
144 return compounded_growth**(periods_per_year/n_periods)-1
145
146
147 def annualize_vol(r, periods_per_year):
148 """
149 Annualizes the vol of a set of returns
150 We should infer the periods per year
151 but that is currently left as an exercise
152 to the reader :-)
153 """
154 return r.std()*(periods_per_year**0.5)
155
156
157 def sharpe_ratio(r, riskfree_rate, periods_per_year):
158 """
159 Computes the annualized sharpe ratio of a set of returns
160 """
161 # convert the annual riskfree rate to per period
162 rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
163 excess_ret = r - rf_per_period
164 ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
165 ann_vol = annualize_vol(r, periods_per_year)
166 return ann_ex_ret/ann_vol
167
168
169 import scipy.stats
170 def is_normal(r, level=0.01):
171 """
172 Applies the Jarque-Bera test to determine if a Series is normal or not
173 Test is applied at the 1% level by default
174 Returns True if the hypothesis of normality is accepted, False otherwise
175 """
176 if isinstance(r, pd.DataFrame):
177 return r.aggregate(is_normal)
178 else:
179 statistic, p_value = scipy.stats.jarque_bera(r)
180 return p_value > level
181
182
183 def drawdown(return_series: pd.Series):
184 """Takes a time series of asset returns.
185 returns a DataFrame with columns for
186 the wealth index,
187 the previous peaks, and
188 the percentage drawdown
189 """
190 wealth_index = 1000*(1+return_series).cumprod()
191 previous_peaks = wealth_index.cummax()
192 drawdowns = (wealth_index - previous_peaks)/previous_peaks
193 return pd.DataFrame({"Wealth": wealth_index,
194 "Previous Peak": previous_peaks,
195 "Drawdown": drawdowns})
196
197
198 def semideviation(r):
199 """
200 Returns the semideviation aka negative semideviation of r
201 r must be a Series or a DataFrame, else raises a TypeError
202 """
203 if isinstance(r, pd.Series):
204 is_negative = r < 0
205 return r[is_negative].std(ddof=0)
206 elif isinstance(r, pd.DataFrame):
207 return r.aggregate(semideviation)
208 else:
209 raise TypeError("Expected r to be a Series or DataFrame")
210
211
212 def var_historic(r, level=5):
213 """
214 Returns the historic Value at Risk at a specified level
215 i.e. returns the number such that "level" percent of the returns
216 fall below that number, and the (100-level) percent are above
217 """
218 if isinstance(r, pd.DataFrame):
219 return r.aggregate(var_historic, level=level)
220 elif isinstance(r, pd.Series):
221 return -np.percentile(r, level)
222 else:
223 raise TypeError("Expected r to be a Series or DataFrame")
224
225
226 def cvar_historic(r, level=5):
227 """
228 Computes the Conditional VaR of Series or DataFrame
229 """
230 if isinstance(r, pd.Series):
231 is_beyond = r <= -var_historic(r, level=level)
232 return -r[is_beyond].mean()
233 elif isinstance(r, pd.DataFrame):
234 return r.aggregate(cvar_historic, level=level)
235 else:
236 raise TypeError("Expected r to be a Series or DataFrame")
237
238
239 from scipy.stats import norm
240 def var_gaussian(r, level=5, modified=False):
241 """
242 Returns the Parametric Gauusian VaR of a Series or DataFrame
243 If "modified" is True, then the modified VaR is returned,
244 using the Cornish-Fisher modification
245 """
246 # compute the Z score assuming it was Gaussian
247 z = norm.ppf(level/100)
248 if modified:
249 # modify the Z score based on observed skewness and kurtosis
250 s = skewness(r)
251 k = kurtosis(r)
252 z = (z +
253 (z**2 - 1)*s/6 +
254 (z**3 -3*z)*(k-3)/24 -
255 (2*z**3 - 5*z)*(s**2)/36
256 )
257 return -(r.mean() + z*r.std(ddof=0))
258
259
260 def portfolio_return(weights, returns):
261 """
262 Computes the return on a portfolio from constituent returns and weights
263 weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
264 """
265 return weights.T @ returns
266
267
268 def portfolio_vol(weights, covmat):
269 """
270 Computes the vol of a portfolio from a covariance matrix and constituent weights
271 weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
272 """
273 return (weights.T @ covmat @ weights)**0.5
274
275
276 def plot_ef2(n_points, er, cov):
277 """
278 Plots the 2-asset efficient frontier
279 """
280 if er.shape[0] != 2 or er.shape[0] != 2:
281 raise ValueError("plot_ef2 can only plot 2-asset frontiers")
282 weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
283 rets = [portfolio_return(w, er) for w in weights]
284 vols = [portfolio_vol(w, cov) for w in weights]
285 ef = pd.DataFrame({
286 "Returns": rets,
287 "Volatility": vols
288 })
289 return ef.plot.line(x="Volatility", y="Returns", style=".-")
290
291
292 from scipy.optimize import minimize
293
294 def minimize_vol(target_return, er, cov):
295 """
296 Returns the optimal weights that achieve the target return
297 given a set of expected returns and a covariance matrix
298 """
299 n = er.shape[0]
300 init_guess = np.repeat(1/n, n)
301 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
302 # construct the constraints
303 weights_sum_to_1 = {'type': 'eq',
304 'fun': lambda weights: np.sum(weights) - 1
305 }
306 return_is_target = {'type': 'eq',
307 'args': (er,),
308 'fun': lambda weights, er: target_return - portfolio_return(weights,er)
309 }
310 weights = minimize(portfolio_vol, init_guess,
311 args=(cov,), method='SLSQP',
312 options={'disp': False},
313 constraints=(weights_sum_to_1,return_is_target),
314 bounds=bounds)
315 return weights.x
316
317
318 def tracking_error(r_a, r_b):
319 """
320 Returns the Tracking Error between the two return series
321 """
322 return np.sqrt(((r_a - r_b)**2).sum())
323
324
325 def msr(riskfree_rate, er, cov):
326 """
327 Returns the weights of the portfolio that gives you the maximum sharpe ratio
328 given the riskfree rate and expected returns and a covariance matrix
329 """
330 n = er.shape[0]
331 init_guess = np.repeat(1/n, n)
332 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
333 # construct the constraints
334 weights_sum_to_1 = {'type': 'eq',
335 'fun': lambda weights: np.sum(weights) - 1
336 }
337 def neg_sharpe(weights, riskfree_rate, er, cov):
338 """
339 Returns the negative of the sharpe ratio
340 of the given portfolio
341 """
342 r = portfolio_return(weights, er)
343 vol = portfolio_vol(weights, cov)
344 return -(r - riskfree_rate)/vol
345
346 weights = minimize(neg_sharpe, init_guess,
347 args=(riskfree_rate, er, cov), method='SLSQP',
348 options={'disp': False},
349 constraints=(weights_sum_to_1,),
350 bounds=bounds)
351 return weights.x
352
353
354 def gmv(cov):
355 """
356 Returns the weights of the Global Minimum Volatility portfolio
357 given a covariance matrix
358 """
359 n = cov.shape[0]
360 return msr(0, np.repeat(1, n), cov)
361
362
363 def optimal_weights(n_points, er, cov):
364 """
365 Returns a list of weights that represent a grid of n_points on the efficient frontier
366 """
367 target_rs = np.linspace(er.min(), er.max(), n_points)
368 weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
369 return weights
370
371
372 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
373 """
374 Plots the multi-asset efficient frontier
375 """
376 weights = optimal_weights(n_points, er, cov)
377 rets = [portfolio_return(w, er) for w in weights]
378 vols = [portfolio_vol(w, cov) for w in weights]
379 ef = pd.DataFrame({
380 "Returns": rets,
381 "Volatility": vols
382 })
383 ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
384 if show_cml:
385 ax.set_xlim(left = 0)
386 # get MSR
387 w_msr = msr(riskfree_rate, er, cov)
388 r_msr = portfolio_return(w_msr, er)
389 vol_msr = portfolio_vol(w_msr, cov)
390 # add CML
391 cml_x = [0, vol_msr]
392 cml_y = [riskfree_rate, r_msr]
393 ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
394 if show_ew:
395 n = er.shape[0]
396 w_ew = np.repeat(1/n, n)
397 r_ew = portfolio_return(w_ew, er)
398 vol_ew = portfolio_vol(w_ew, cov)
399 # add EW
400 ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
401 if show_gmv:
402 w_gmv = gmv(cov)
403 r_gmv = portfolio_return(w_gmv, er)
404 vol_gmv = portfolio_vol(w_gmv, cov)
405 # add EW
406 ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
407
408 return ax
409
410
411 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
412 """
413 Run a backtest of the CPPI strategy, given a set of returns for the risky asset
414 Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
415 """
416 # set up the CPPI parameters
417 dates = risky_r.index
418 n_steps = len(dates)
419 account_value = start
420 floor_value = start*floor
421 peak = account_value
422 if isinstance(risky_r, pd.Series):
423 risky_r = pd.DataFrame(risky_r, columns=["R"])
424
425 if safe_r is None:
426 safe_r = pd.DataFrame().reindex_like(risky_r)
427 safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
428 # set up some DataFrames for saving intermediate values
429 account_history = pd.DataFrame().reindex_like(risky_r)
430 risky_w_history = pd.DataFrame().reindex_like(risky_r)
431 cushion_history = pd.DataFrame().reindex_like(risky_r)
432 floorval_history = pd.DataFrame().reindex_like(risky_r)
433 peak_history = pd.DataFrame().reindex_like(risky_r)
434
435 for step in range(n_steps):
436 if drawdown is not None:
437 peak = np.maximum(peak, account_value)
438 floor_value = peak*(1-drawdown)
439 cushion = (account_value - floor_value)/account_value
440 risky_w = m*cushion
441 risky_w = np.minimum(risky_w, 1)
442 risky_w = np.maximum(risky_w, 0)
443 safe_w = 1-risky_w
444 risky_alloc = account_value*risky_w
445 safe_alloc = account_value*safe_w
446 # recompute the new account value at the end of this step
447 account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
448 # save the histories for analysis and plotting
449 cushion_history.iloc[step] = cushion
450 risky_w_history.iloc[step] = risky_w
451 account_history.iloc[step] = account_value
452 floorval_history.iloc[step] = floor_value
453 peak_history.iloc[step] = peak
454 risky_wealth = start*(1+risky_r).cumprod()
455 backtest_result = {
456 "Wealth": account_history,
457 "Risky Wealth": risky_wealth,
458 "Risk Budget": cushion_history,
459 "Risky Allocation": risky_w_history,
460 "m": m,
461 "start": start,
462 "floor": floor,
463 "risky_r":risky_r,
464 "safe_r": safe_r,
465 "drawdown": drawdown,
466 "peak": peak_history,
467 "floor": floorval_history
468 }
469 return backtest_result
470
471
472 def summary_stats(r, riskfree_rate=0.03):
473 """
474 Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
475 """
476 ann_r = r.aggregate(annualize_rets, periods_per_year=12)
477 ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
478 ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
479 dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
480 skew = r.aggregate(skewness)
481 kurt = r.aggregate(kurtosis)
482 cf_var5 = r.aggregate(var_gaussian, modified=True)
483 hist_cvar5 = r.aggregate(cvar_historic)
484 return pd.DataFrame({
485 "Annualized Return": ann_r,
486 "Annualized Vol": ann_vol,
487 "Skewness": skew,
488 "Kurtosis": kurt,
489 "Cornish-Fisher VaR (5%)": cf_var5,
490 "Historic CVaR (5%)": hist_cvar5,
491 "Sharpe Ratio": ann_sr,
492 "Max Drawdown": dd
493 })
494
495
496 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
497 """
498 Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
499 :param n_years: The number of years to generate data for
500 :param n_paths: The number of scenarios/trajectories
501 :param mu: Annualized Drift, e.g. Market Return
502 :param sigma: Annualized Volatility
503 :param steps_per_year: granularity of the simulation
504 :param s_0: initial value
505 :return: a numpy array of n_paths columns and n_years*steps_per_year rows
506 """
507 # Derive per-step Model Parameters from User Specifications
508 dt = 1/steps_per_year
509 n_steps = int(n_years*steps_per_year) + 1
510 # the standard way ...
511 # rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
512 # without discretization error ...
513 rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
514 rets_plus_1[0] = 1
515 ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
516 return ret_val
517
518
519 import statsmodels.api as sm
520 def regress(dependent_variable, explanatory_variables, alpha=True):
521 """
522 Runs a linear regression to decompose the dependent variable into the explanatory variables
523 returns an object of type statsmodel's RegressionResults on which you can call
524 .summary() to print a full summary
525 .params for the coefficients
526 .tvalues and .pvalues for the significance levels
527 .rsquared_adj and .rsquared for quality of fit
528 """
529 if alpha:
530 explanatory_variables = explanatory_variables.copy()
531 explanatory_variables["Alpha"] = 1
532
533 lm = sm.OLS(dependent_variable, explanatory_variables).fit()
534 return lm
535
536 def portfolio_tracking_error(weights, ref_r, bb_r):
537 """
538 returns the tracking error between the reference returns
539 and a portfolio of building block returns held with given weights
540 """
541 return tracking_error(ref_r, (weights*bb_r).sum(axis=1))
542
543 def style_analysis(dependent_variable, explanatory_variables):
544 """
545 Returns the optimal weights that minimizes the Tracking error between
546 a portfolio of the explanatory variables and the dependent variable
547 """
548 n = explanatory_variables.shape[1]
549 init_guess = np.repeat(1/n, n)
550 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
551 # construct the constraints
552 weights_sum_to_1 = {'type': 'eq',
553 'fun': lambda weights: np.sum(weights) - 1
554 }
555 solution = minimize(portfolio_tracking_error, init_guess,
556 args=(dependent_variable, explanatory_variables,), method='SLSQP',
557 options={'disp': False},
558 constraints=(weights_sum_to_1,),
559 bounds=bounds)
560 weights = pd.Series(solution.x, index=explanatory_variables.columns)
561 return weights
562
563
564 def ff_analysis(r, factors):
565 """
566 Returns the loadings of r on the Fama French Factors
567 which can be read in using get_fff_returns()
568 the index of r must be a (not necessarily proper) subset of the index of factors
569 r is either a Series or a DataFrame
570 """
571 if isinstance(r, pd.Series):
572 dependent_variable = r
573 explanatory_variables = factors.loc[r.index]
574 tilts = regress(dependent_variable, explanatory_variables).params
575 elif isinstance(r, pd.DataFrame):
576 tilts = pd.DataFrame({col: ff_analysis(r[col], factors) for col in r.columns})
577 else:
578 raise TypeError("r must be a Series or a DataFrame")
579 return tilts
580
581 def weight_ew(r, cap_weights=None, max_cw_mult=None, microcap_threshold=None, **kwargs):
582 """
583 Returns the weights of the EW portfolio based on the asset returns "r" as a DataFrame
584 If supplied a set of capweights and a capweight tether, it is applied and reweighted
585 """
586 n = len(r.columns)
587 ew = pd.Series(1/n, index=r.columns)
588 if cap_weights is not None:
589 cw = cap_weights.loc[r.index[0]] # starting cap weight
590 ## exclude microcaps
591 if microcap_threshold is not None and microcap_threshold > 0:
592 microcap = cw < microcap_threshold
593 ew[microcap] = 0
594 ew = ew/ew.sum()
595 #limit weight to a multiple of capweight
596 if max_cw_mult is not None and max_cw_mult > 0:
597 ew = np.minimum(ew, cw*max_cw_mult)
598 ew = ew/ew.sum() #reweight
599 return ew
600
601 def weight_cw(r, cap_weights, **kwargs):
602 """
603 Returns the weights of the CW portfolio based on the time series of capweights
604 """
605 w = cap_weights.loc[r.index[1]]
606 return cap_weights.loc[r.index[1]]
607
608 def backtest_ws(r, estimation_window=60, weighting=weight_ew, verbose=False, **kwargs):
609 """
610 Backtests a given weighting scheme, given some parameters:
611 r : asset returns to use to build the portfolio
612 estimation_window: the window to use to estimate parameters
613 weighting: the weighting scheme to use, must be a function that takes "r", and a variable number of keyword-value arguments
614 """
615 n_periods = r.shape[0]
616 # return windows
617 windows = [(start, start+estimation_window) for start in range(n_periods-estimation_window)]
618 weights = [weighting(r.iloc[win[0]:win[1]], **kwargs) for win in windows]
619 # convert List of weights to DataFrame
620 weights = pd.DataFrame(weights, index=r.iloc[estimation_window:].index, columns=r.columns)
621 returns = (weights * r).sum(axis="columns", min_count=1) #mincount is to generate NAs if all inputs are NAs
622 return returns