ml-finance-python
python scripts for finance machine learning
git clone https://9o.is/git/ml-finance-python.git
edhec_risk_kit_121.py
(16713B)
1 import pandas as pd
2 import numpy as np
3
4 def get_ffme_returns():
5 """
6 Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
7 """
8 me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
9 header=0, index_col=0, na_values=-99.99)
10 rets = me_m[['Lo 10', 'Hi 10']]
11 rets.columns = ['SmallCap', 'LargeCap']
12 rets = rets/100
13 rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
14 return rets
15
16
17 def get_hfi_returns():
18 """
19 Load and format the EDHEC Hedge Fund Index Returns
20 """
21 hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
22 header=0, index_col=0, parse_dates=True)
23 hfi = hfi/100
24 hfi.index = hfi.index.to_period('M')
25 return hfi
26
27 def get_ind_file(filetype):
28 """
29 Load and format the Ken French 30 Industry Portfolios files
30 """
31 known_types = ["returns", "nfirms", "size"]
32 if filetype not in known_types:
33 raise ValueError(f"filetype must be one of:{','.join(known_types)}")
34 if filetype is "returns":
35 name = "vw_rets"
36 divisor = 100
37 elif filetype is "nfirms":
38 name = "nfirms"
39 divisor = 1
40 elif filetype is "size":
41 name = "size"
42 divisor = 1
43
44 ind = pd.read_csv(f"data/ind30_m_{name}.csv", header=0, index_col=0)/divisor
45 ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
46 ind.columns = ind.columns.str.strip()
47 return ind
48
49 def get_ind_returns():
50 """
51 Load and format the Ken French 30 Industry Portfolios Value Weighted Monthly Returns
52 """
53 return get_ind_file("returns")
54
55 def get_ind_nfirms():
56 """
57 Load and format the Ken French 30 Industry Portfolios Average number of Firms
58 """
59 return get_ind_file("nfirms")
60
61 def get_ind_size():
62 """
63 Load and format the Ken French 30 Industry Portfolios Average size (market cap)
64 """
65 return get_ind_file("size")
66
67
68 def get_total_market_index_returns():
69 """
70 Load the 30 industry portfolio data and derive the returns of a capweighted total market index
71 """
72 ind_nfirms = get_ind_nfirms()
73 ind_size = get_ind_size()
74 ind_return = get_ind_returns()
75 ind_mktcap = ind_nfirms * ind_size
76 total_mktcap = ind_mktcap.sum(axis=1)
77 ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
78 total_market_return = (ind_capweight * ind_return).sum(axis="columns")
79 return total_market_return
80
81 def skewness(r):
82 """
83 Alternative to scipy.stats.skew()
84 Computes the skewness of the supplied Series or DataFrame
85 Returns a float or a Series
86 """
87 demeaned_r = r - r.mean()
88 # use the population standard deviation, so set dof=0
89 sigma_r = r.std(ddof=0)
90 exp = (demeaned_r**3).mean()
91 return exp/sigma_r**3
92
93
94 def kurtosis(r):
95 """
96 Alternative to scipy.stats.kurtosis()
97 Computes the kurtosis of the supplied Series or DataFrame
98 Returns a float or a Series
99 """
100 demeaned_r = r - r.mean()
101 # use the population standard deviation, so set dof=0
102 sigma_r = r.std(ddof=0)
103 exp = (demeaned_r**4).mean()
104 return exp/sigma_r**4
105
106 def compound(r):
107 """
108 returns the result of compounding the set of returns in r
109 """
110 return np.expm1(np.log1p(r).sum())
111
112
113 def annualize_rets(r, periods_per_year):
114 """
115 Annualizes a set of returns
116 We should infer the periods per year
117 but that is currently left as an exercise
118 to the reader :-)
119 """
120 compounded_growth = (1+r).prod()
121 n_periods = r.shape[0]
122 return compounded_growth**(periods_per_year/n_periods)-1
123
124
125 def annualize_vol(r, periods_per_year):
126 """
127 Annualizes the vol of a set of returns
128 We should infer the periods per year
129 but that is currently left as an exercise
130 to the reader :-)
131 """
132 return r.std()*(periods_per_year**0.5)
133
134
135 def sharpe_ratio(r, riskfree_rate, periods_per_year):
136 """
137 Computes the annualized sharpe ratio of a set of returns
138 """
139 # convert the annual riskfree rate to per period
140 rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
141 excess_ret = r - rf_per_period
142 ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
143 ann_vol = annualize_vol(r, periods_per_year)
144 return ann_ex_ret/ann_vol
145
146
147 import scipy.stats
148 def is_normal(r, level=0.01):
149 """
150 Applies the Jarque-Bera test to determine if a Series is normal or not
151 Test is applied at the 1% level by default
152 Returns True if the hypothesis of normality is accepted, False otherwise
153 """
154 if isinstance(r, pd.DataFrame):
155 return r.aggregate(is_normal)
156 else:
157 statistic, p_value = scipy.stats.jarque_bera(r)
158 return p_value > level
159
160
161 def drawdown(return_series: pd.Series):
162 """Takes a time series of asset returns.
163 returns a DataFrame with columns for
164 the wealth index,
165 the previous peaks, and
166 the percentage drawdown
167 """
168 wealth_index = 1000*(1+return_series).cumprod()
169 previous_peaks = wealth_index.cummax()
170 drawdowns = (wealth_index - previous_peaks)/previous_peaks
171 return pd.DataFrame({"Wealth": wealth_index,
172 "Previous Peak": previous_peaks,
173 "Drawdown": drawdowns})
174
175
176 def semideviation(r):
177 """
178 Returns the semideviation aka negative semideviation of r
179 r must be a Series or a DataFrame, else raises a TypeError
180 """
181 if isinstance(r, pd.Series):
182 is_negative = r < 0
183 return r[is_negative].std(ddof=0)
184 elif isinstance(r, pd.DataFrame):
185 return r.aggregate(semideviation)
186 else:
187 raise TypeError("Expected r to be a Series or DataFrame")
188
189
190 def var_historic(r, level=5):
191 """
192 Returns the historic Value at Risk at a specified level
193 i.e. returns the number such that "level" percent of the returns
194 fall below that number, and the (100-level) percent are above
195 """
196 if isinstance(r, pd.DataFrame):
197 return r.aggregate(var_historic, level=level)
198 elif isinstance(r, pd.Series):
199 return -np.percentile(r, level)
200 else:
201 raise TypeError("Expected r to be a Series or DataFrame")
202
203
204 def cvar_historic(r, level=5):
205 """
206 Computes the Conditional VaR of Series or DataFrame
207 """
208 if isinstance(r, pd.Series):
209 is_beyond = r <= var_historic(r, level=level)
210 return -r[is_beyond].mean()
211 elif isinstance(r, pd.DataFrame):
212 return r.aggregate(cvar_historic, level=level)
213 else:
214 raise TypeError("Expected r to be a Series or DataFrame")
215
216
217 from scipy.stats import norm
218 def var_gaussian(r, level=5, modified=False):
219 """
220 Returns the Parametric Gauusian VaR of a Series or DataFrame
221 If "modified" is True, then the modified VaR is returned,
222 using the Cornish-Fisher modification
223 """
224 # compute the Z score assuming it was Gaussian
225 z = norm.ppf(level/100)
226 if modified:
227 # modify the Z score based on observed skewness and kurtosis
228 s = skewness(r)
229 k = kurtosis(r)
230 z = (z +
231 (z**2 - 1)*s/6 +
232 (z**3 -3*z)*(k-3)/24 -
233 (2*z**3 - 5*z)*(s**2)/36
234 )
235 return -(r.mean() + z*r.std(ddof=0))
236
237
238 def portfolio_return(weights, returns):
239 """
240 Computes the return on a portfolio from constituent returns and weights
241 weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
242 """
243 return weights.T @ returns
244
245
246 def portfolio_vol(weights, covmat):
247 """
248 Computes the vol of a portfolio from a covariance matrix and constituent weights
249 weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
250 """
251 return (weights.T @ covmat @ weights)**0.5
252
253
254 def plot_ef2(n_points, er, cov):
255 """
256 Plots the 2-asset efficient frontier
257 """
258 if er.shape[0] != 2 or er.shape[0] != 2:
259 raise ValueError("plot_ef2 can only plot 2-asset frontiers")
260 weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
261 rets = [portfolio_return(w, er) for w in weights]
262 vols = [portfolio_vol(w, cov) for w in weights]
263 ef = pd.DataFrame({
264 "Returns": rets,
265 "Volatility": vols
266 })
267 return ef.plot.line(x="Volatility", y="Returns", style=".-")
268
269
270 from scipy.optimize import minimize
271
272 def minimize_vol(target_return, er, cov):
273 """
274 Returns the optimal weights that achieve the target return
275 given a set of expected returns and a covariance matrix
276 """
277 n = er.shape[0]
278 init_guess = np.repeat(1/n, n)
279 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
280 # construct the constraints
281 weights_sum_to_1 = {'type': 'eq',
282 'fun': lambda weights: np.sum(weights) - 1
283 }
284 return_is_target = {'type': 'eq',
285 'args': (er,),
286 'fun': lambda weights, er: target_return - portfolio_return(weights,er)
287 }
288 weights = minimize(portfolio_vol, init_guess,
289 args=(cov,), method='SLSQP',
290 options={'disp': False},
291 constraints=(weights_sum_to_1,return_is_target),
292 bounds=bounds)
293 return weights.x
294
295
296 def msr(riskfree_rate, er, cov):
297 """
298 Returns the weights of the portfolio that gives you the maximum sharpe ratio
299 given the riskfree rate and expected returns and a covariance matrix
300 """
301 n = er.shape[0]
302 init_guess = np.repeat(1/n, n)
303 bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
304 # construct the constraints
305 weights_sum_to_1 = {'type': 'eq',
306 'fun': lambda weights: np.sum(weights) - 1
307 }
308 def neg_sharpe(weights, riskfree_rate, er, cov):
309 """
310 Returns the negative of the sharpe ratio
311 of the given portfolio
312 """
313 r = portfolio_return(weights, er)
314 vol = portfolio_vol(weights, cov)
315 return -(r - riskfree_rate)/vol
316
317 weights = minimize(neg_sharpe, init_guess,
318 args=(riskfree_rate, er, cov), method='SLSQP',
319 options={'disp': False},
320 constraints=(weights_sum_to_1,),
321 bounds=bounds)
322 return weights.x
323
324
325 def gmv(cov):
326 """
327 Returns the weights of the Global Minimum Volatility portfolio
328 given a covariance matrix
329 """
330 n = cov.shape[0]
331 return msr(0, np.repeat(1, n), cov)
332
333
334 def optimal_weights(n_points, er, cov):
335 """
336 Returns a list of weights that represent a grid of n_points on the efficient frontier
337 """
338 target_rs = np.linspace(er.min(), er.max(), n_points)
339 weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
340 return weights
341
342
343 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
344 """
345 Plots the multi-asset efficient frontier
346 """
347 weights = optimal_weights(n_points, er, cov)
348 rets = [portfolio_return(w, er) for w in weights]
349 vols = [portfolio_vol(w, cov) for w in weights]
350 ef = pd.DataFrame({
351 "Returns": rets,
352 "Volatility": vols
353 })
354 ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
355 if show_cml:
356 ax.set_xlim(left = 0)
357 # get MSR
358 w_msr = msr(riskfree_rate, er, cov)
359 r_msr = portfolio_return(w_msr, er)
360 vol_msr = portfolio_vol(w_msr, cov)
361 # add CML
362 cml_x = [0, vol_msr]
363 cml_y = [riskfree_rate, r_msr]
364 ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
365 if show_ew:
366 n = er.shape[0]
367 w_ew = np.repeat(1/n, n)
368 r_ew = portfolio_return(w_ew, er)
369 vol_ew = portfolio_vol(w_ew, cov)
370 # add EW
371 ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
372 if show_gmv:
373 w_gmv = gmv(cov)
374 r_gmv = portfolio_return(w_gmv, er)
375 vol_gmv = portfolio_vol(w_gmv, cov)
376 # add EW
377 ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
378
379 return ax
380
381
382 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
383 """
384 Run a backtest of the CPPI strategy, given a set of returns for the risky asset
385 Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
386 """
387 # set up the CPPI parameters
388 dates = risky_r.index
389 n_steps = len(dates)
390 account_value = start
391 floor_value = start*floor
392 peak = account_value
393 if isinstance(risky_r, pd.Series):
394 risky_r = pd.DataFrame(risky_r, columns=["R"])
395
396 if safe_r is None:
397 safe_r = pd.DataFrame().reindex_like(risky_r)
398 safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
399 # set up some DataFrames for saving intermediate values
400 account_history = pd.DataFrame().reindex_like(risky_r)
401 risky_w_history = pd.DataFrame().reindex_like(risky_r)
402 cushion_history = pd.DataFrame().reindex_like(risky_r)
403 floorval_history = pd.DataFrame().reindex_like(risky_r)
404 peak_history = pd.DataFrame().reindex_like(risky_r)
405
406 for step in range(n_steps):
407 if drawdown is not None:
408 peak = np.maximum(peak, account_value)
409 floor_value = peak*(1-drawdown)
410 cushion = (account_value - floor_value)/account_value
411 risky_w = m*cushion
412 risky_w = np.minimum(risky_w, 1)
413 risky_w = np.maximum(risky_w, 0)
414 safe_w = 1-risky_w
415 risky_alloc = account_value*risky_w
416 safe_alloc = account_value*safe_w
417 # recompute the new account value at the end of this step
418 account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
419 # save the histories for analysis and plotting
420 cushion_history.iloc[step] = cushion
421 risky_w_history.iloc[step] = risky_w
422 account_history.iloc[step] = account_value
423 floorval_history.iloc[step] = floor_value
424 peak_history.iloc[step] = peak
425 risky_wealth = start*(1+risky_r).cumprod()
426 backtest_result = {
427 "Wealth": account_history,
428 "Risky Wealth": risky_wealth,
429 "Risk Budget": cushion_history,
430 "Risky Allocation": risky_w_history,
431 "m": m,
432 "start": start,
433 "floor": floor,
434 "risky_r":risky_r,
435 "safe_r": safe_r,
436 "drawdown": drawdown,
437 "peak": peak_history,
438 "floor": floorval_history
439 }
440 return backtest_result
441
442
443 def summary_stats(r, riskfree_rate=0.03):
444 """
445 Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
446 """
447 ann_r = r.aggregate(annualize_rets, periods_per_year=12)
448 ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
449 ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
450 dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
451 skew = r.aggregate(skewness)
452 kurt = r.aggregate(kurtosis)
453 cf_var5 = r.aggregate(var_gaussian, modified=True)
454 hist_cvar5 = r.aggregate(cvar_historic)
455 return pd.DataFrame({
456 "Annualized Return": ann_r,
457 "Annualized Vol": ann_vol,
458 "Skewness": skew,
459 "Kurtosis": kurt,
460 "Cornish-Fisher VaR (5%)": cf_var5,
461 "Historic CVaR (5%)": hist_cvar5,
462 "Sharpe Ratio": ann_sr,
463 "Max Drawdown": dd
464 })
465
466
467 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0):
468 """
469 Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
470 :param n_years: The number of years to generate data for
471 :param n_paths: The number of scenarios/trajectories
472 :param mu: Annualized Drift, e.g. Market Return
473 :param sigma: Annualized Volatility
474 :param steps_per_year: granularity of the simulation
475 :param s_0: initial value
476 :return: a numpy array of n_paths columns and n_years*steps_per_year rows
477 """
478 # Derive per-step Model Parameters from User Specifications
479 dt = 1/steps_per_year
480 n_steps = int(n_years*steps_per_year) + 1
481 rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
482 # or better ...
483 # rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
484 rets_plus_1[0] = 1
485 prices = s_0*pd.DataFrame(rets_plus_1).cumprod()
486 return prices
487