ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
edhec_risk_kit_125.py
(19148B)
1 import pandas as pd
2 import numpy as np
3
4 def get_ffme_returns():
5 """
6 Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
7 """
8 me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
9 header=0, index_col=0, na_values=-99.99)
10 rets = me_m[['Lo 10', 'Hi 10']]
11 rets.columns = ['SmallCap', 'LargeCap']
12 rets = rets/100
13 rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
14 return rets
15
16
17 def get_hfi_returns():
18 """
19 Load and format the EDHEC Hedge Fund Index Returns
20 """
21 hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
22 header=0, index_col=0, parse_dates=True)
23 hfi = hfi/100
24 hfi.index = hfi.index.to_period('M')
25 return hfi
26
27 def get_ind_file(filetype):
28 """
29 Load and format the Ken French 30 Industry Portfolios files
30 """
31 known_types = ["returns", "nfirms", "size"]
32 if filetype not in known_types:
33 raise ValueError(f"filetype must be one of:{','.join(known_types)}")
34 if filetype is "returns":
35 name = "vw_rets"
36 divisor = 100
37 elif filetype is "nfirms":
38 name = "nfirms"
39 divisor = 1
40 elif filetype is "size":
41 name = "size"
42 divisor = 1
43
44 ind = pd.read_csv(f"data/ind30_m_{name}.csv", header=0, index_col=0)/divisor
45 ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
46 ind.columns = ind.columns.str.strip()
47 return ind
48
49 def get_ind_returns():
50 """
51 Load and format the Ken French 30 Industry Portfolios Value Weighted Monthly Returns
52 """
53 return get_ind_file("returns")
54
55 def get_ind_nfirms():
56 """
57 Load and format the Ken French 30 Industry Portfolios Average number of Firms
58 """
59 return get_ind_file("nfirms")
60
61 def get_ind_size():
62 """
63 Load and format the Ken French 30 Industry Portfolios Average size (market cap)
64 """
65 return get_ind_file("size")
66
67
68 def get_total_market_index_returns():
69 """
70 Load the 30 industry portfolio data and derive the returns of a capweighted total market index
71 """
72 ind_nfirms = get_ind_nfirms()
73 ind_size = get_ind_size()
74 ind_return = get_ind_returns()
75 ind_mktcap = ind_nfirms * ind_size
76 total_mktcap = ind_mktcap.sum(axis=1)
77 ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
78 total_market_return = (ind_capweight * ind_return).sum(axis="columns")
79 return total_market_return
80
81 def skewness(r):
82 """
83 Alternative to scipy.stats.skew()
84 Computes the skewness of the supplied Series or DataFrame
85 Returns a float or a Series
86 """
87 demeaned_r = r - r.mean()
88 # use the population standard deviation, so set dof=0
89 sigma_r = r.std(ddof=0)
90 exp = (demeaned_r**3).mean()
91 return exp/sigma_r**3
92
93
94 def kurtosis(r):
95 """
96 Alternative to scipy.stats.kurtosis()
97 Computes the kurtosis of the supplied Series or DataFrame
98 Returns a float or a Series
99 """
100 demeaned_r = r - r.mean()
101 # use the population standard deviation, so set dof=0
102 sigma_r = r.std(ddof=0)
103 exp = (demeaned_r**4).mean()
104 return exp/sigma_r**4
105
106
107 def compound(r):
108 """
109 returns the result of compounding the set of returns in r
110 """
111 return np.expm1(np.log1p(r).sum())
112
113
114 def annualize_rets(r, periods_per_year):
115 """
116 Annualizes a set of returns
117 We should infer the periods per year
118 but that is currently left as an exercise
119 to the reader :-)
120 """
121 compounded_growth = (1+r).prod()
122 n_periods = r.shape[0]
123 return compounded_growth**(periods_per_year/n_periods)-1
124
125
126 def annualize_vol(r, periods_per_year):
127 """
128 Annualizes the vol of a set of returns
129 We should infer the periods per year
130 but that is currently left as an exercise
131 to the reader :-)
132 """
133 return r.std()*(periods_per_year**0.5)
134
135
136 def sharpe_ratio(r, riskfree_rate, periods_per_year):
137 """
138 Computes the annualized sharpe ratio of a set of returns
139 """
140 # convert the annual riskfree rate to per period
141 rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
142 excess_ret = r - rf_per_period
143 ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
144 ann_vol = annualize_vol(r, periods_per_year)
145 return ann_ex_ret/ann_vol
146
147
148 import scipy.stats
149 def is_normal(r, level=0.01):
150 """
151 Applies the Jarque-Bera test to determine if a Series is normal or not
152 Test is applied at the 1% level by default
153 Returns True if the hypothesis of normality is accepted, False otherwise
154 """
155 if isinstance(r, pd.DataFrame):
156 return r.aggregate(is_normal)
157 else:
158 statistic, p_value = scipy.stats.jarque_bera(r)
159 return p_value > level
160
161
162 def drawdown(return_series: pd.Series):
163 """Takes a time series of asset returns.
164 returns a DataFrame with columns for
165 the wealth index,
166 the previous peaks, and
167 the percentage drawdown
168 """
169 wealth_index = 1000*(1+return_series).cumprod()
170 previous_peaks = wealth_index.cummax()
171 drawdowns = (wealth_index - previous_peaks)/previous_peaks
172 return pd.DataFrame({"Wealth": wealth_index,
173 "Previous Peak": previous_peaks,
174 "Drawdown": drawdowns})
175
176
177 def semideviation(r):
178 """
179 Returns the semideviation aka negative semideviation of r
180 r must be a Series or a DataFrame, else raises a TypeError
181 """
182 if isinstance(r, pd.Series):
183 is_negative = r < 0
184 return r[is_negative].std(ddof=0)
185 elif isinstance(r, pd.DataFrame):
186 return r.aggregate(semideviation)
187 else:
188 raise TypeError("Expected r to be a Series or DataFrame")
189
190
191 def var_historic(r, level=5):
192 """
193 Returns the historic Value at Risk at a specified level
194 i.e. returns the number such that "level" percent of the returns
195 fall below that number, and the (100-level) percent are above
196 """
197 if isinstance(r, pd.DataFrame):
198 return r.aggregate(var_historic, level=level)
199 elif isinstance(r, pd.Series):
200 return -np.percentile(r, level)
201 else:
202 raise TypeError("Expected r to be a Series or DataFrame")
203
204
205 def cvar_historic(r, level=5):
206 """
207 Computes the Conditional VaR of Series or DataFrame
208 """
209 if isinstance(r, pd.Series):
210 is_beyond = r <= -var_historic(r, level=level)
211 return -r[is_beyond].mean()
212 elif isinstance(r, pd.DataFrame):
213 return r.aggregate(cvar_historic, level=level)
214 else:
215 raise TypeError("Expected r to be a Series or DataFrame")
216
217
218 from scipy.stats import norm
219 def var_gaussian(r, level=5, modified=False):
220 """
221 Returns the Parametric Gauusian VaR of a Series or DataFrame
222 If "modified" is True, then the modified VaR is returned,
223 using the Cornish-Fisher modification
224 """
225 # compute the Z score assuming it was Gaussian
226 z = norm.ppf(level/100)
227 if modified:
228 # modify the Z score based on observed skewness and kurtosis
229 s = skewness(r)
230 k = kurtosis(r)
231 z = (z +
232 (z**2 - 1)*s/6 +
233 (z**3 -3*z)*(k-3)/24 -
234 (2*z**3 - 5*z)*(s**2)/36
235 )
236 return -(r.mean() + z*r.std(ddof=0))
237
238
239 def portfolio_return(weights, returns):
240 """
241 Computes the return on a portfolio from constituent returns and weights
242 weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
243 """
244 return weights.T @ returns
245
246
247 def portfolio_vol(weights, covmat):
248 """
249 Computes the vol of a portfolio from a covariance matrix and constituent weights
250 weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
251 """
252 return (weights.T @ covmat @ weights)**0.5
253
254
255 def plot_ef2(n_points, er, cov):
256 """
257 Plots the 2-asset efficient frontier
258 """
259 if er.shape[0] != 2 or er.shape[0] != 2:
260 raise ValueError("plot_ef2 can only plot 2-asset frontiers")
261 weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
262 rets = [portfolio_return(w, er) for w in weights]
263 vols = [portfolio_vol(w, cov) for w in weights]
264 ef = pd.DataFrame({
265 "Returns": rets,
266 "Volatility": vols
267 })
268 return ef.plot.line(x="Volatility", y="Returns", style=".-")
269
270
271 from scipy.optimize import minimize
272
273 def minimize_vol(target_return, er, cov):
274 """
275 Returns the optimal weights that achieve the target return
276 given a set of expected returns and a covariance matrix
277 """
278 n = er.shape[0]
279 init_guess = np.repeat(1/n, n)
280 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
281 # construct the constraints
282 weights_sum_to_1 = {'type': 'eq',
283 'fun': lambda weights: np.sum(weights) - 1
284 }
285 return_is_target = {'type': 'eq',
286 'args': (er,),
287 'fun': lambda weights, er: target_return - portfolio_return(weights,er)
288 }
289 weights = minimize(portfolio_vol, init_guess,
290 args=(cov,), method='SLSQP',
291 options={'disp': False},
292 constraints=(weights_sum_to_1,return_is_target),
293 bounds=bounds)
294 return weights.x
295
296
297 def msr(riskfree_rate, er, cov):
298 """
299 Returns the weights of the portfolio that gives you the maximum sharpe ratio
300 given the riskfree rate and expected returns and a covariance matrix
301 """
302 n = er.shape[0]
303 init_guess = np.repeat(1/n, n)
304 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
305 # construct the constraints
306 weights_sum_to_1 = {'type': 'eq',
307 'fun': lambda weights: np.sum(weights) - 1
308 }
309 def neg_sharpe(weights, riskfree_rate, er, cov):
310 """
311 Returns the negative of the sharpe ratio
312 of the given portfolio
313 """
314 r = portfolio_return(weights, er)
315 vol = portfolio_vol(weights, cov)
316 return -(r - riskfree_rate)/vol
317
318 weights = minimize(neg_sharpe, init_guess,
319 args=(riskfree_rate, er, cov), method='SLSQP',
320 options={'disp': False},
321 constraints=(weights_sum_to_1,),
322 bounds=bounds)
323 return weights.x
324
325
326 def gmv(cov):
327 """
328 Returns the weights of the Global Minimum Volatility portfolio
329 given a covariance matrix
330 """
331 n = cov.shape[0]
332 return msr(0, np.repeat(1, n), cov)
333
334
335 def optimal_weights(n_points, er, cov):
336 """
337 Returns a list of weights that represent a grid of n_points on the efficient frontier
338 """
339 target_rs = np.linspace(er.min(), er.max(), n_points)
340 weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
341 return weights
342
343
344 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
345 """
346 Plots the multi-asset efficient frontier
347 """
348 weights = optimal_weights(n_points, er, cov)
349 rets = [portfolio_return(w, er) for w in weights]
350 vols = [portfolio_vol(w, cov) for w in weights]
351 ef = pd.DataFrame({
352 "Returns": rets,
353 "Volatility": vols
354 })
355 ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
356 if show_cml:
357 ax.set_xlim(left = 0)
358 # get MSR
359 w_msr = msr(riskfree_rate, er, cov)
360 r_msr = portfolio_return(w_msr, er)
361 vol_msr = portfolio_vol(w_msr, cov)
362 # add CML
363 cml_x = [0, vol_msr]
364 cml_y = [riskfree_rate, r_msr]
365 ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
366 if show_ew:
367 n = er.shape[0]
368 w_ew = np.repeat(1/n, n)
369 r_ew = portfolio_return(w_ew, er)
370 vol_ew = portfolio_vol(w_ew, cov)
371 # add EW
372 ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
373 if show_gmv:
374 w_gmv = gmv(cov)
375 r_gmv = portfolio_return(w_gmv, er)
376 vol_gmv = portfolio_vol(w_gmv, cov)
377 # add EW
378 ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
379
380 return ax
381
382
383 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
384 """
385 Run a backtest of the CPPI strategy, given a set of returns for the risky asset
386 Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
387 """
388 # set up the CPPI parameters
389 dates = risky_r.index
390 n_steps = len(dates)
391 account_value = start
392 floor_value = start*floor
393 peak = account_value
394 if isinstance(risky_r, pd.Series):
395 risky_r = pd.DataFrame(risky_r, columns=["R"])
396
397 if safe_r is None:
398 safe_r = pd.DataFrame().reindex_like(risky_r)
399 safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
400 # set up some DataFrames for saving intermediate values
401 account_history = pd.DataFrame().reindex_like(risky_r)
402 risky_w_history = pd.DataFrame().reindex_like(risky_r)
403 cushion_history = pd.DataFrame().reindex_like(risky_r)
404 floorval_history = pd.DataFrame().reindex_like(risky_r)
405 peak_history = pd.DataFrame().reindex_like(risky_r)
406
407 for step in range(n_steps):
408 if drawdown is not None:
409 peak = np.maximum(peak, account_value)
410 floor_value = peak*(1-drawdown)
411 cushion = (account_value - floor_value)/account_value
412 risky_w = m*cushion
413 risky_w = np.minimum(risky_w, 1)
414 risky_w = np.maximum(risky_w, 0)
415 safe_w = 1-risky_w
416 risky_alloc = account_value*risky_w
417 safe_alloc = account_value*safe_w
418 # recompute the new account value at the end of this step
419 account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
420 # save the histories for analysis and plotting
421 cushion_history.iloc[step] = cushion
422 risky_w_history.iloc[step] = risky_w
423 account_history.iloc[step] = account_value
424 floorval_history.iloc[step] = floor_value
425 peak_history.iloc[step] = peak
426 risky_wealth = start*(1+risky_r).cumprod()
427 backtest_result = {
428 "Wealth": account_history,
429 "Risky Wealth": risky_wealth,
430 "Risk Budget": cushion_history,
431 "Risky Allocation": risky_w_history,
432 "m": m,
433 "start": start,
434 "floor": floor,
435 "risky_r":risky_r,
436 "safe_r": safe_r,
437 "drawdown": drawdown,
438 "peak": peak_history,
439 "floor": floorval_history
440 }
441 return backtest_result
442
443
444 def summary_stats(r, riskfree_rate=0.03):
445 """
446 Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
447 """
448 ann_r = r.aggregate(annualize_rets, periods_per_year=12)
449 ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
450 ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
451 dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
452 skew = r.aggregate(skewness)
453 kurt = r.aggregate(kurtosis)
454 cf_var5 = r.aggregate(var_gaussian, modified=True)
455 hist_cvar5 = r.aggregate(cvar_historic)
456 return pd.DataFrame({
457 "Annualized Return": ann_r,
458 "Annualized Vol": ann_vol,
459 "Skewness": skew,
460 "Kurtosis": kurt,
461 "Cornish-Fisher VaR (5%)": cf_var5,
462 "Historic CVaR (5%)": hist_cvar5,
463 "Sharpe Ratio": ann_sr,
464 "Max Drawdown": dd
465 })
466
467
468 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
469 """
470 Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
471 :param n_years: The number of years to generate data for
472 :param n_paths: The number of scenarios/trajectories
473 :param mu: Annualized Drift, e.g. Market Return
474 :param sigma: Annualized Volatility
475 :param steps_per_year: granularity of the simulation
476 :param s_0: initial value
477 :return: a numpy array of n_paths columns and n_years*steps_per_year rows
478 """
479 # Derive per-step Model Parameters from User Specifications
480 dt = 1/steps_per_year
481 n_steps = int(n_years*steps_per_year) + 1
482 # the standard way ...
483 # rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
484 # without discretization error ...
485 rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
486 rets_plus_1[0] = 1
487 ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
488 return ret_val
489
490 def discount(t, r):
491 """
492 Compute the price of a pure discount bond that pays a dollar at time t where t is in years and r is the annual interest rate
493 """
494 return (1+r)**(-t)
495
496 def pv(l, r):
497 """
498 Compute the present value of a list of liabilities given by the time (as an index) and amounts
499 """
500 dates = l.index
501 discounts = discount(dates, r)
502 return (discounts*l).sum()
503
504 def funding_ratio(assets, liabilities, r):
505 """
506 Computes the funding ratio of a series of liabilities, based on an interest rate and current value of assets
507 """
508 return assets/pv(liabilities, r)
509
510 def inst_to_ann(r):
511 """
512 Convert an instantaneous interest rate to an annual interest rate
513 """
514 return np.expm1(r)
515
516 def ann_to_inst(r):
517 """
518 Convert an instantaneous interest rate to an annual interest rate
519 """
520 return np.log1p(r)
521
522 import math
523 def cir(n_years = 10, n_scenarios=1, a=0.05, b=0.03, sigma=0.05, steps_per_year=12, r_0=None):
524 """
525 Generate random interest rate evolution over time using the CIR model
526 b and r_0 are assumed to be the annualized rates, not the short rate
527 and the returned values are the annualized rates as well
528 """
529 if r_0 is None: r_0 = b
530 r_0 = ann_to_inst(r_0)
531 dt = 1/steps_per_year
532 num_steps = int(n_years*steps_per_year) + 1 # because n_years might be a float
533
534 shock = np.random.normal(0, scale=np.sqrt(dt), size=(num_steps, n_scenarios))
535 rates = np.empty_like(shock)
536 rates[0] = r_0
537
538 ## For Price Generation
539 h = math.sqrt(a**2 + 2*sigma**2)
540 prices = np.empty_like(shock)
541 ####
542
543 def price(ttm, r):
544 _A = ((2*h*math.exp((h+a)*ttm/2))/(2*h+(h+a)*(math.exp(h*ttm)-1)))**(2*a*b/sigma**2)
545 _B = (2*(math.exp(h*ttm)-1))/(2*h + (h+a)*(math.exp(h*ttm)-1))
546 _P = _A*np.exp(-_B*r)
547 return _P
548 prices[0] = price(n_years, r_0)
549 ####
550
551 for step in range(1, num_steps):
552 r_t = rates[step-1]
553 d_r_t = a*(b-r_t)*dt + sigma*np.sqrt(r_t)*shock[step]
554 rates[step] = abs(r_t + d_r_t)
555 # generate prices at time t as well ...
556 prices[step] = price(n_years-step*dt, rates[step])
557
558 rates = pd.DataFrame(data=inst_to_ann(rates), index=range(num_steps))
559 ### for prices
560 prices = pd.DataFrame(data=prices, index=range(num_steps))
561 ###
562 return rates, prices