ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
edhec_risk_kit_126.py
(20894B)
1 import pandas as pd
2 import numpy as np
3 import math
4
5 def get_ffme_returns():
6 """
7 Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
8 """
9 me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
10 header=0, index_col=0, na_values=-99.99)
11 rets = me_m[['Lo 10', 'Hi 10']]
12 rets.columns = ['SmallCap', 'LargeCap']
13 rets = rets/100
14 rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
15 return rets
16
17
18 def get_hfi_returns():
19 """
20 Load and format the EDHEC Hedge Fund Index Returns
21 """
22 hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
23 header=0, index_col=0, parse_dates=True)
24 hfi = hfi/100
25 hfi.index = hfi.index.to_period('M')
26 return hfi
27
28 def get_ind_file(filetype):
29 """
30 Load and format the Ken French 30 Industry Portfolios files
31 """
32 known_types = ["returns", "nfirms", "size"]
33 if filetype not in known_types:
34 sep = ','
35 raise ValueError(f'filetype must be one of:{sep.join(known_types)}')
36 if filetype is "returns":
37 name = "vw_rets"
38 divisor = 100
39 elif filetype is "nfirms":
40 name = "nfirms"
41 divisor = 1
42 elif filetype is "size":
43 name = "size"
44 divisor = 1
45 ind = pd.read_csv(f"data/ind30_m_{name}.csv", header=0, index_col=0)/divisor
46 ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
47 ind.columns = ind.columns.str.strip()
48 return ind
49
50 def get_ind_returns():
51 """
52 Load and format the Ken French 30 Industry Portfolios Value Weighted Monthly Returns
53 """
54 return get_ind_file("returns")
55
56 def get_ind_nfirms():
57 """
58 Load and format the Ken French 30 Industry Portfolios Average number of Firms
59 """
60 return get_ind_file("nfirms")
61
62 def get_ind_size():
63 """
64 Load and format the Ken French 30 Industry Portfolios Average size (market cap)
65 """
66 return get_ind_file("size")
67
68
69 def get_total_market_index_returns():
70 """
71 Load the 30 industry portfolio data and derive the returns of a capweighted total market index
72 """
73 ind_nfirms = get_ind_nfirms()
74 ind_size = get_ind_size()
75 ind_return = get_ind_returns()
76 ind_mktcap = ind_nfirms * ind_size
77 total_mktcap = ind_mktcap.sum(axis=1)
78 ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
79 total_market_return = (ind_capweight * ind_return).sum(axis="columns")
80 return total_market_return
81
82 def skewness(r):
83 """
84 Alternative to scipy.stats.skew()
85 Computes the skewness of the supplied Series or DataFrame
86 Returns a float or a Series
87 """
88 demeaned_r = r - r.mean()
89 # use the population standard deviation, so set dof=0
90 sigma_r = r.std(ddof=0)
91 exp = (demeaned_r**3).mean()
92 return exp/sigma_r**3
93
94
95 def kurtosis(r):
96 """
97 Alternative to scipy.stats.kurtosis()
98 Computes the kurtosis of the supplied Series or DataFrame
99 Returns a float or a Series
100 """
101 demeaned_r = r - r.mean()
102 # use the population standard deviation, so set dof=0
103 sigma_r = r.std(ddof=0)
104 exp = (demeaned_r**4).mean()
105 return exp/sigma_r**4
106
107
108 def compound(r):
109 """
110 returns the result of compounding the set of returns in r
111 """
112 return np.expm1(np.log1p(r).sum())
113
114
115 def annualize_rets(r, periods_per_year):
116 """
117 Annualizes a set of returns
118 We should infer the periods per year
119 but that is currently left as an exercise
120 to the reader :-)
121 """
122 compounded_growth = (1+r).prod()
123 n_periods = r.shape[0]
124 return compounded_growth**(periods_per_year/n_periods)-1
125
126
127 def annualize_vol(r, periods_per_year):
128 """
129 Annualizes the vol of a set of returns
130 We should infer the periods per year
131 but that is currently left as an exercise
132 to the reader :-)
133 """
134 return r.std()*(periods_per_year**0.5)
135
136
137 def sharpe_ratio(r, riskfree_rate, periods_per_year):
138 """
139 Computes the annualized sharpe ratio of a set of returns
140 """
141 # convert the annual riskfree rate to per period
142 rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
143 excess_ret = r - rf_per_period
144 ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
145 ann_vol = annualize_vol(r, periods_per_year)
146 return ann_ex_ret/ann_vol
147
148
149 import scipy.stats
150 def is_normal(r, level=0.01):
151 """
152 Applies the Jarque-Bera test to determine if a Series is normal or not
153 Test is applied at the 1% level by default
154 Returns True if the hypothesis of normality is accepted, False otherwise
155 """
156 if isinstance(r, pd.DataFrame):
157 return r.aggregate(is_normal)
158 else:
159 statistic, p_value = scipy.stats.jarque_bera(r)
160 return p_value > level
161
162
163 def drawdown(return_series: pd.Series):
164 """Takes a time series of asset returns.
165 returns a DataFrame with columns for
166 the wealth index,
167 the previous peaks, and
168 the percentage drawdown
169 """
170 wealth_index = 1000*(1+return_series).cumprod()
171 previous_peaks = wealth_index.cummax()
172 drawdowns = (wealth_index - previous_peaks)/previous_peaks
173 return pd.DataFrame({"Wealth": wealth_index,
174 "Previous Peak": previous_peaks,
175 "Drawdown": drawdowns})
176
177
178 def semideviation(r):
179 """
180 Returns the semideviation aka negative semideviation of r
181 r must be a Series or a DataFrame, else raises a TypeError
182 """
183 if isinstance(r, pd.Series):
184 is_negative = r < 0
185 return r[is_negative].std(ddof=0)
186 elif isinstance(r, pd.DataFrame):
187 return r.aggregate(semideviation)
188 else:
189 raise TypeError("Expected r to be a Series or DataFrame")
190
191
192 def var_historic(r, level=5):
193 """
194 Returns the historic Value at Risk at a specified level
195 i.e. returns the number such that "level" percent of the returns
196 fall below that number, and the (100-level) percent are above
197 """
198 if isinstance(r, pd.DataFrame):
199 return r.aggregate(var_historic, level=level)
200 elif isinstance(r, pd.Series):
201 return -np.percentile(r, level)
202 else:
203 raise TypeError("Expected r to be a Series or DataFrame")
204
205
206 def cvar_historic(r, level=5):
207 """
208 Computes the Conditional VaR of Series or DataFrame
209 """
210 if isinstance(r, pd.Series):
211 is_beyond = r <= -var_historic(r, level=level)
212 return -r[is_beyond].mean()
213 elif isinstance(r, pd.DataFrame):
214 return r.aggregate(cvar_historic, level=level)
215 else:
216 raise TypeError("Expected r to be a Series or DataFrame")
217
218
219 from scipy.stats import norm
220 def var_gaussian(r, level=5, modified=False):
221 """
222 Returns the Parametric Gauusian VaR of a Series or DataFrame
223 If "modified" is True, then the modified VaR is returned,
224 using the Cornish-Fisher modification
225 """
226 # compute the Z score assuming it was Gaussian
227 z = norm.ppf(level/100)
228 if modified:
229 # modify the Z score based on observed skewness and kurtosis
230 s = skewness(r)
231 k = kurtosis(r)
232 z = (z +
233 (z**2 - 1)*s/6 +
234 (z**3 -3*z)*(k-3)/24 -
235 (2*z**3 - 5*z)*(s**2)/36
236 )
237 return -(r.mean() + z*r.std(ddof=0))
238
239
240 def portfolio_return(weights, returns):
241 """
242 Computes the return on a portfolio from constituent returns and weights
243 weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
244 """
245 return weights.T @ returns
246
247
248 def portfolio_vol(weights, covmat):
249 """
250 Computes the vol of a portfolio from a covariance matrix and constituent weights
251 weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
252 """
253 return (weights.T @ covmat @ weights)**0.5
254
255
256 def plot_ef2(n_points, er, cov):
257 """
258 Plots the 2-asset efficient frontier
259 """
260 if er.shape[0] != 2 or er.shape[0] != 2:
261 raise ValueError("plot_ef2 can only plot 2-asset frontiers")
262 weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
263 rets = [portfolio_return(w, er) for w in weights]
264 vols = [portfolio_vol(w, cov) for w in weights]
265 ef = pd.DataFrame({
266 "Returns": rets,
267 "Volatility": vols
268 })
269 return ef.plot.line(x="Volatility", y="Returns", style=".-")
270
271
272 from scipy.optimize import minimize
273
274 def minimize_vol(target_return, er, cov):
275 """
276 Returns the optimal weights that achieve the target return
277 given a set of expected returns and a covariance matrix
278 """
279 n = er.shape[0]
280 init_guess = np.repeat(1/n, n)
281 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
282 # construct the constraints
283 weights_sum_to_1 = {'type': 'eq',
284 'fun': lambda weights: np.sum(weights) - 1
285 }
286 return_is_target = {'type': 'eq',
287 'args': (er,),
288 'fun': lambda weights, er: target_return - portfolio_return(weights,er)
289 }
290 weights = minimize(portfolio_vol, init_guess,
291 args=(cov,), method='SLSQP',
292 options={'disp': False},
293 constraints=(weights_sum_to_1,return_is_target),
294 bounds=bounds)
295 return weights.x
296
297
298
299 def msr(riskfree_rate, er, cov):
300 """
301 Returns the weights of the portfolio that gives you the maximum sharpe ratio
302 given the riskfree rate and expected returns and a covariance matrix
303 """
304 n = er.shape[0]
305 init_guess = np.repeat(1/n, n)
306 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
307 # construct the constraints
308 weights_sum_to_1 = {'type': 'eq',
309 'fun': lambda weights: np.sum(weights) - 1
310 }
311 def neg_sharpe(weights, riskfree_rate, er, cov):
312 """
313 Returns the negative of the sharpe ratio
314 of the given portfolio
315 """
316 r = portfolio_return(weights, er)
317 vol = portfolio_vol(weights, cov)
318 return -(r - riskfree_rate)/vol
319
320 weights = minimize(neg_sharpe, init_guess,
321 args=(riskfree_rate, er, cov), method='SLSQP',
322 options={'disp': False},
323 constraints=(weights_sum_to_1,),
324 bounds=bounds)
325 return weights.x
326
327
328 def gmv(cov):
329 """
330 Returns the weights of the Global Minimum Volatility portfolio
331 given a covariance matrix
332 """
333 n = cov.shape[0]
334 return msr(0, np.repeat(1, n), cov)
335
336
337 def optimal_weights(n_points, er, cov):
338 """
339 Returns a list of weights that represent a grid of n_points on the efficient frontier
340 """
341 target_rs = np.linspace(er.min(), er.max(), n_points)
342 weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
343 return weights
344
345
346 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
347 """
348 Plots the multi-asset efficient frontier
349 """
350 weights = optimal_weights(n_points, er, cov)
351 rets = [portfolio_return(w, er) for w in weights]
352 vols = [portfolio_vol(w, cov) for w in weights]
353 ef = pd.DataFrame({
354 "Returns": rets,
355 "Volatility": vols
356 })
357 ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
358 if show_cml:
359 ax.set_xlim(left = 0)
360 # get MSR
361 w_msr = msr(riskfree_rate, er, cov)
362 r_msr = portfolio_return(w_msr, er)
363 vol_msr = portfolio_vol(w_msr, cov)
364 # add CML
365 cml_x = [0, vol_msr]
366 cml_y = [riskfree_rate, r_msr]
367 ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
368 if show_ew:
369 n = er.shape[0]
370 w_ew = np.repeat(1/n, n)
371 r_ew = portfolio_return(w_ew, er)
372 vol_ew = portfolio_vol(w_ew, cov)
373 # add EW
374 ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
375 if show_gmv:
376 w_gmv = gmv(cov)
377 r_gmv = portfolio_return(w_gmv, er)
378 vol_gmv = portfolio_vol(w_gmv, cov)
379 # add EW
380 ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
381
382 return ax
383
384
385 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
386 """
387 Run a backtest of the CPPI strategy, given a set of returns for the risky asset
388 Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
389 """
390 # set up the CPPI parameters
391 dates = risky_r.index
392 n_steps = len(dates)
393 account_value = start
394 floor_value = start*floor
395 peak = account_value
396 if isinstance(risky_r, pd.Series):
397 risky_r = pd.DataFrame(risky_r, columns=["R"])
398
399 if safe_r is None:
400 safe_r = pd.DataFrame().reindex_like(risky_r)
401 safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
402 # set up some DataFrames for saving intermediate values
403 account_history = pd.DataFrame().reindex_like(risky_r)
404 risky_w_history = pd.DataFrame().reindex_like(risky_r)
405 cushion_history = pd.DataFrame().reindex_like(risky_r)
406 floorval_history = pd.DataFrame().reindex_like(risky_r)
407 peak_history = pd.DataFrame().reindex_like(risky_r)
408
409 for step in range(n_steps):
410 if drawdown is not None:
411 peak = np.maximum(peak, account_value)
412 floor_value = peak*(1-drawdown)
413 cushion = (account_value - floor_value)/account_value
414 risky_w = m*cushion
415 risky_w = np.minimum(risky_w, 1)
416 risky_w = np.maximum(risky_w, 0)
417 safe_w = 1-risky_w
418 risky_alloc = account_value*risky_w
419 safe_alloc = account_value*safe_w
420 # recompute the new account value at the end of this step
421 account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
422 # save the histories for analysis and plotting
423 cushion_history.iloc[step] = cushion
424 risky_w_history.iloc[step] = risky_w
425 account_history.iloc[step] = account_value
426 floorval_history.iloc[step] = floor_value
427 peak_history.iloc[step] = peak
428 risky_wealth = start*(1+risky_r).cumprod()
429 backtest_result = {
430 "Wealth": account_history,
431 "Risky Wealth": risky_wealth,
432 "Risk Budget": cushion_history,
433 "Risky Allocation": risky_w_history,
434 "m": m,
435 "start": start,
436 "floor": floor,
437 "risky_r":risky_r,
438 "safe_r": safe_r,
439 "drawdown": drawdown,
440 "peak": peak_history,
441 "floor": floorval_history
442 }
443 return backtest_result
444
445
446 def summary_stats(r, riskfree_rate=0.03):
447 """
448 Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
449 """
450 ann_r = r.aggregate(annualize_rets, periods_per_year=12)
451 ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
452 ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
453 dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
454 skew = r.aggregate(skewness)
455 kurt = r.aggregate(kurtosis)
456 cf_var5 = r.aggregate(var_gaussian, modified=True)
457 hist_cvar5 = r.aggregate(cvar_historic)
458 return pd.DataFrame({
459 "Annualized Return": ann_r,
460 "Annualized Vol": ann_vol,
461 "Skewness": skew,
462 "Kurtosis": kurt,
463 "Cornish-Fisher VaR (5%)": cf_var5,
464 "Historic CVaR (5%)": hist_cvar5,
465 "Sharpe Ratio": ann_sr,
466 "Max Drawdown": dd
467 })
468
469
470 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
471 """
472 Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
473 :param n_years: The number of years to generate data for
474 :param n_paths: The number of scenarios/trajectories
475 :param mu: Annualized Drift, e.g. Market Return
476 :param sigma: Annualized Volatility
477 :param steps_per_year: granularity of the simulation
478 :param s_0: initial value
479 :return: a numpy array of n_paths columns and n_years*steps_per_year rows
480 """
481 # Derive per-step Model Parameters from User Specifications
482 dt = 1/steps_per_year
483 n_steps = int(n_years*steps_per_year) + 1
484 # the standard way ...
485 # rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
486 # without discretization error ...
487 rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
488 rets_plus_1[0] = 1
489 ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
490 return ret_val
491
492 def discount(t, r):
493 """
494 Compute the price of a pure discount bond that pays a dollar at time t where t is in years and r is the annual interest rate
495 """
496 return (1+r)**(-t)
497
498 def pv(l, r):
499 """
500 Compute the present value of a list of liabilities given by the time (as an index) and amounts
501 """
502 dates = l.index
503 discounts = discount(dates, r)
504 return (discounts*l).sum()
505
506 def funding_ratio(assets, liabilities, r):
507 """
508 Computes the funding ratio of a series of liabilities, based on an interest rate and current value of assets
509 """
510 return pv(assets, r)/pv(liabilities, r)
511
512 def inst_to_ann(r):
513 """
514 Convert an instantaneous interest rate to an annual interest rate
515 """
516 return np.expm1(r)
517
518 def ann_to_inst(r):
519 """
520 Convert an instantaneous interest rate to an annual interest rate
521 """
522 return np.log1p(r)
523
524 def cir(n_years = 10, n_scenarios=1, a=0.05, b=0.03, sigma=0.05, steps_per_year=12, r_0=None):
525 """
526 Generate random interest rate evolution over time using the CIR model
527 b and r_0 are assumed to be the annualized rates, not the short rate
528 and the returned values are the annualized rates as well
529 """
530 if r_0 is None: r_0 = b
531 r_0 = ann_to_inst(r_0)
532 dt = 1/steps_per_year
533 num_steps = int(n_years*steps_per_year) + 1 # because n_years might be a float
534
535 shock = np.random.normal(0, scale=np.sqrt(dt), size=(num_steps, n_scenarios))
536 rates = np.empty_like(shock)
537 rates[0] = r_0
538
539 ## For Price Generation
540 h = math.sqrt(a**2 + 2*sigma**2)
541 prices = np.empty_like(shock)
542 ####
543
544 def price(ttm, r):
545 _A = ((2*h*math.exp((h+a)*ttm/2))/(2*h+(h+a)*(math.exp(h*ttm)-1)))**(2*a*b/sigma**2)
546 _B = (2*(math.exp(h*ttm)-1))/(2*h + (h+a)*(math.exp(h*ttm)-1))
547 _P = _A*np.exp(-_B*r)
548 return _P
549 prices[0] = price(n_years, r_0)
550 ####
551
552 for step in range(1, num_steps):
553 r_t = rates[step-1]
554 d_r_t = a*(b-r_t)*dt + sigma*np.sqrt(r_t)*shock[step]
555 rates[step] = abs(r_t + d_r_t)
556 # generate prices at time t as well ...
557 prices[step] = price(n_years-step*dt, rates[step])
558
559 rates = pd.DataFrame(data=inst_to_ann(rates), index=range(num_steps))
560 ### for prices
561 prices = pd.DataFrame(data=prices, index=range(num_steps))
562 ###
563 return rates, prices
564
565 def bond_cash_flows(maturity, principal=100, coupon_rate=0.03, coupons_per_year=12):
566 """
567 Returns the series of cash flows generated by a bond,
568 indexed by the payment/coupon number
569 """
570 n_coupons = round(maturity*coupons_per_year)
571 coupon_amt = principal*coupon_rate/coupons_per_year
572 coupons = np.repeat(coupon_amt, n_coupons)
573 coupon_times = np.arange(1, n_coupons+1)
574 cash_flows = pd.Series(data=coupon_amt, index=coupon_times)
575 cash_flows.iloc[-1] += principal
576 return cash_flows
577
578 def bond_price(maturity, principal=100, coupon_rate=0.03, coupons_per_year=12, discount_rate=0.03):
579 """
580 Computes the price of a bond that pays regular coupons until maturity
581 at which time the principal and the final coupon is returned
582 This is not designed to be efficient, rather,
583 it is to illustrate the underlying principle behind bond pricing!
584 """
585 cash_flows = bond_cash_flows(maturity, principal, coupon_rate, coupons_per_year)
586 return pv(cash_flows, discount_rate/coupons_per_year)
587
588 def macaulay_duration(flows, discount_rate):
589 """
590 Computes the Macaulay Duration of a sequence of cash flows, given a per-period discount rate
591 """
592 discounted_flows = discount(flows.index, discount_rate)*flows
593 weights = discounted_flows/discounted_flows.sum()
594 return np.average(flows.index, weights=weights)
595
596 def match_durations(cf_t, cf_s, cf_l, discount_rate):
597 """
598 Returns the weight W in cf_s that, along with (1-W) in cf_l will have an effective
599 duration that matches cf_t
600 """
601 d_t = macaulay_duration(cf_t, discount_rate)
602 d_s = macaulay_duration(cf_s, discount_rate)
603 d_l = macaulay_duration(cf_l, discount_rate)
604 return (d_l - d_t)/(d_l - d_s)