ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

edhec_risk_kit_124.py

(17394B)


      1 import pandas as pd
      2 import numpy as np
      3 
      4 def get_ffme_returns():
      5     """
      6     Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
      7     """
      8     me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
      9                        header=0, index_col=0, na_values=-99.99)
     10     rets = me_m[['Lo 10', 'Hi 10']]
     11     rets.columns = ['SmallCap', 'LargeCap']
     12     rets = rets/100
     13     rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
     14     return rets
     15 
     16 
     17 def get_hfi_returns():
     18     """
     19     Load and format the EDHEC Hedge Fund Index Returns
     20     """
     21     hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
     22                       header=0, index_col=0, parse_dates=True)
     23     hfi = hfi/100
     24     hfi.index = hfi.index.to_period('M')
     25     return hfi
     26 
     27 def get_ind_file(filetype):
     28     """
     29     Load and format the Ken French 30 Industry Portfolios files
     30     """
     31     known_types = ["returns", "nfirms", "size"]
     32     if filetype not in known_types:
     33         raise ValueError(f"filetype must be one of:{','.join(known_types)}")
     34     if filetype is "returns":
     35         name = "vw_rets"
     36         divisor = 100
     37     elif filetype is "nfirms":
     38         name = "nfirms"
     39         divisor = 1
     40     elif filetype is "size":
     41         name = "size"
     42         divisor = 1
     43                          
     44     ind = pd.read_csv(f"data/ind30_m_{name}.csv", header=0, index_col=0)/divisor
     45     ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
     46     ind.columns = ind.columns.str.strip()
     47     return ind
     48 
     49 def get_ind_returns():
     50     """
     51     Load and format the Ken French 30 Industry Portfolios Value Weighted Monthly Returns
     52     """
     53     return get_ind_file("returns")
     54 
     55 def get_ind_nfirms():
     56     """
     57     Load and format the Ken French 30 Industry Portfolios Average number of Firms
     58     """
     59     return get_ind_file("nfirms")
     60 
     61 def get_ind_size():
     62     """
     63     Load and format the Ken French 30 Industry Portfolios Average size (market cap)
     64     """
     65     return get_ind_file("size")
     66 
     67                          
     68 def get_total_market_index_returns():
     69     """
     70     Load the 30 industry portfolio data and derive the returns of a capweighted total market index
     71     """
     72     ind_nfirms = get_ind_nfirms()
     73     ind_size = get_ind_size()
     74     ind_return = get_ind_returns()
     75     ind_mktcap = ind_nfirms * ind_size
     76     total_mktcap = ind_mktcap.sum(axis=1)
     77     ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
     78     total_market_return = (ind_capweight * ind_return).sum(axis="columns")
     79     return total_market_return
     80                          
     81 def skewness(r):
     82     """
     83     Alternative to scipy.stats.skew()
     84     Computes the skewness of the supplied Series or DataFrame
     85     Returns a float or a Series
     86     """
     87     demeaned_r = r - r.mean()
     88     # use the population standard deviation, so set dof=0
     89     sigma_r = r.std(ddof=0)
     90     exp = (demeaned_r**3).mean()
     91     return exp/sigma_r**3
     92 
     93 
     94 def kurtosis(r):
     95     """
     96     Alternative to scipy.stats.kurtosis()
     97     Computes the kurtosis of the supplied Series or DataFrame
     98     Returns a float or a Series
     99     """
    100     demeaned_r = r - r.mean()
    101     # use the population standard deviation, so set dof=0
    102     sigma_r = r.std(ddof=0)
    103     exp = (demeaned_r**4).mean()
    104     return exp/sigma_r**4
    105 
    106 
    107 def compound(r):
    108     """
    109     returns the result of compounding the set of returns in r
    110     """
    111     return np.expm1(np.log1p(r).sum())
    112 
    113                          
    114 def annualize_rets(r, periods_per_year):
    115     """
    116     Annualizes a set of returns
    117     We should infer the periods per year
    118     but that is currently left as an exercise
    119     to the reader :-)
    120     """
    121     compounded_growth = (1+r).prod()
    122     n_periods = r.shape[0]
    123     return compounded_growth**(periods_per_year/n_periods)-1
    124 
    125 
    126 def annualize_vol(r, periods_per_year):
    127     """
    128     Annualizes the vol of a set of returns
    129     We should infer the periods per year
    130     but that is currently left as an exercise
    131     to the reader :-)
    132     """
    133     return r.std()*(periods_per_year**0.5)
    134 
    135 
    136 def sharpe_ratio(r, riskfree_rate, periods_per_year):
    137     """
    138     Computes the annualized sharpe ratio of a set of returns
    139     """
    140     # convert the annual riskfree rate to per period
    141     rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
    142     excess_ret = r - rf_per_period
    143     ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
    144     ann_vol = annualize_vol(r, periods_per_year)
    145     return ann_ex_ret/ann_vol
    146 
    147 
    148 import scipy.stats
    149 def is_normal(r, level=0.01):
    150     """
    151     Applies the Jarque-Bera test to determine if a Series is normal or not
    152     Test is applied at the 1% level by default
    153     Returns True if the hypothesis of normality is accepted, False otherwise
    154     """
    155     if isinstance(r, pd.DataFrame):
    156         return r.aggregate(is_normal)
    157     else:
    158         statistic, p_value = scipy.stats.jarque_bera(r)
    159         return p_value > level
    160 
    161 
    162 def drawdown(return_series: pd.Series):
    163     """Takes a time series of asset returns.
    164        returns a DataFrame with columns for
    165        the wealth index, 
    166        the previous peaks, and 
    167        the percentage drawdown
    168     """
    169     wealth_index = 1000*(1+return_series).cumprod()
    170     previous_peaks = wealth_index.cummax()
    171     drawdowns = (wealth_index - previous_peaks)/previous_peaks
    172     return pd.DataFrame({"Wealth": wealth_index, 
    173                          "Previous Peak": previous_peaks, 
    174                          "Drawdown": drawdowns})
    175 
    176 
    177 def semideviation(r):
    178     """
    179     Returns the semideviation aka negative semideviation of r
    180     r must be a Series or a DataFrame, else raises a TypeError
    181     """
    182     if isinstance(r, pd.Series):
    183         is_negative = r < 0
    184         return r[is_negative].std(ddof=0)
    185     elif isinstance(r, pd.DataFrame):
    186         return r.aggregate(semideviation)
    187     else:
    188         raise TypeError("Expected r to be a Series or DataFrame")
    189 
    190 
    191 def var_historic(r, level=5):
    192     """
    193     Returns the historic Value at Risk at a specified level
    194     i.e. returns the number such that "level" percent of the returns
    195     fall below that number, and the (100-level) percent are above
    196     """
    197     if isinstance(r, pd.DataFrame):
    198         return r.aggregate(var_historic, level=level)
    199     elif isinstance(r, pd.Series):
    200         return -np.percentile(r, level)
    201     else:
    202         raise TypeError("Expected r to be a Series or DataFrame")
    203 
    204 
    205 def cvar_historic(r, level=5):
    206     """
    207     Computes the Conditional VaR of Series or DataFrame
    208     """
    209     if isinstance(r, pd.Series):
    210         is_beyond = r <= -var_historic(r, level=level)
    211         return -r[is_beyond].mean()
    212     elif isinstance(r, pd.DataFrame):
    213         return r.aggregate(cvar_historic, level=level)
    214     else:
    215         raise TypeError("Expected r to be a Series or DataFrame")
    216 
    217 
    218 from scipy.stats import norm
    219 def var_gaussian(r, level=5, modified=False):
    220     """
    221     Returns the Parametric Gauusian VaR of a Series or DataFrame
    222     If "modified" is True, then the modified VaR is returned,
    223     using the Cornish-Fisher modification
    224     """
    225     # compute the Z score assuming it was Gaussian
    226     z = norm.ppf(level/100)
    227     if modified:
    228         # modify the Z score based on observed skewness and kurtosis
    229         s = skewness(r)
    230         k = kurtosis(r)
    231         z = (z +
    232                 (z**2 - 1)*s/6 +
    233                 (z**3 -3*z)*(k-3)/24 -
    234                 (2*z**3 - 5*z)*(s**2)/36
    235             )
    236     return -(r.mean() + z*r.std(ddof=0))
    237 
    238 
    239 def portfolio_return(weights, returns):
    240     """
    241     Computes the return on a portfolio from constituent returns and weights
    242     weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
    243     """
    244     return weights.T @ returns
    245 
    246 
    247 def portfolio_vol(weights, covmat):
    248     """
    249     Computes the vol of a portfolio from a covariance matrix and constituent weights
    250     weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
    251     """
    252     return (weights.T @ covmat @ weights)**0.5
    253 
    254 
    255 def plot_ef2(n_points, er, cov):
    256     """
    257     Plots the 2-asset efficient frontier
    258     """
    259     if er.shape[0] != 2 or er.shape[0] != 2:
    260         raise ValueError("plot_ef2 can only plot 2-asset frontiers")
    261     weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
    262     rets = [portfolio_return(w, er) for w in weights]
    263     vols = [portfolio_vol(w, cov) for w in weights]
    264     ef = pd.DataFrame({
    265         "Returns": rets, 
    266         "Volatility": vols
    267     })
    268     return ef.plot.line(x="Volatility", y="Returns", style=".-")
    269 
    270 
    271 from scipy.optimize import minimize
    272 
    273 def minimize_vol(target_return, er, cov):
    274     """
    275     Returns the optimal weights that achieve the target return
    276     given a set of expected returns and a covariance matrix
    277     """
    278     n = er.shape[0]
    279     init_guess = np.repeat(1/n, n)
    280     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    281     # construct the constraints
    282     weights_sum_to_1 = {'type': 'eq',
    283                         'fun': lambda weights: np.sum(weights) - 1
    284     }
    285     return_is_target = {'type': 'eq',
    286                         'args': (er,),
    287                         'fun': lambda weights, er: target_return - portfolio_return(weights,er)
    288     }
    289     weights = minimize(portfolio_vol, init_guess,
    290                        args=(cov,), method='SLSQP',
    291                        options={'disp': False},
    292                        constraints=(weights_sum_to_1,return_is_target),
    293                        bounds=bounds)
    294     return weights.x
    295 
    296 
    297 def msr(riskfree_rate, er, cov):
    298     """
    299     Returns the weights of the portfolio that gives you the maximum sharpe ratio
    300     given the riskfree rate and expected returns and a covariance matrix
    301     """
    302     n = er.shape[0]
    303     init_guess = np.repeat(1/n, n)
    304     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    305     # construct the constraints
    306     weights_sum_to_1 = {'type': 'eq',
    307                         'fun': lambda weights: np.sum(weights) - 1
    308     }
    309     def neg_sharpe(weights, riskfree_rate, er, cov):
    310         """
    311         Returns the negative of the sharpe ratio
    312         of the given portfolio
    313         """
    314         r = portfolio_return(weights, er)
    315         vol = portfolio_vol(weights, cov)
    316         return -(r - riskfree_rate)/vol
    317     
    318     weights = minimize(neg_sharpe, init_guess,
    319                        args=(riskfree_rate, er, cov), method='SLSQP',
    320                        options={'disp': False},
    321                        constraints=(weights_sum_to_1,),
    322                        bounds=bounds)
    323     return weights.x
    324 
    325 
    326 def gmv(cov):
    327     """
    328     Returns the weights of the Global Minimum Volatility portfolio
    329     given a covariance matrix
    330     """
    331     n = cov.shape[0]
    332     return msr(0, np.repeat(1, n), cov)
    333 
    334 
    335 def optimal_weights(n_points, er, cov):
    336     """
    337     Returns a list of weights that represent a grid of n_points on the efficient frontier
    338     """
    339     target_rs = np.linspace(er.min(), er.max(), n_points)
    340     weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
    341     return weights
    342 
    343 
    344 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
    345     """
    346     Plots the multi-asset efficient frontier
    347     """
    348     weights = optimal_weights(n_points, er, cov)
    349     rets = [portfolio_return(w, er) for w in weights]
    350     vols = [portfolio_vol(w, cov) for w in weights]
    351     ef = pd.DataFrame({
    352         "Returns": rets, 
    353         "Volatility": vols
    354     })
    355     ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
    356     if show_cml:
    357         ax.set_xlim(left = 0)
    358         # get MSR
    359         w_msr = msr(riskfree_rate, er, cov)
    360         r_msr = portfolio_return(w_msr, er)
    361         vol_msr = portfolio_vol(w_msr, cov)
    362         # add CML
    363         cml_x = [0, vol_msr]
    364         cml_y = [riskfree_rate, r_msr]
    365         ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
    366     if show_ew:
    367         n = er.shape[0]
    368         w_ew = np.repeat(1/n, n)
    369         r_ew = portfolio_return(w_ew, er)
    370         vol_ew = portfolio_vol(w_ew, cov)
    371         # add EW
    372         ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
    373     if show_gmv:
    374         w_gmv = gmv(cov)
    375         r_gmv = portfolio_return(w_gmv, er)
    376         vol_gmv = portfolio_vol(w_gmv, cov)
    377         # add EW
    378         ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
    379         
    380         return ax
    381 
    382                          
    383 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
    384     """
    385     Run a backtest of the CPPI strategy, given a set of returns for the risky asset
    386     Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
    387     """
    388     # set up the CPPI parameters
    389     dates = risky_r.index
    390     n_steps = len(dates)
    391     account_value = start
    392     floor_value = start*floor
    393     peak = account_value
    394     if isinstance(risky_r, pd.Series): 
    395         risky_r = pd.DataFrame(risky_r, columns=["R"])
    396 
    397     if safe_r is None:
    398         safe_r = pd.DataFrame().reindex_like(risky_r)
    399         safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
    400     # set up some DataFrames for saving intermediate values
    401     account_history = pd.DataFrame().reindex_like(risky_r)
    402     risky_w_history = pd.DataFrame().reindex_like(risky_r)
    403     cushion_history = pd.DataFrame().reindex_like(risky_r)
    404     floorval_history = pd.DataFrame().reindex_like(risky_r)
    405     peak_history = pd.DataFrame().reindex_like(risky_r)
    406 
    407     for step in range(n_steps):
    408         if drawdown is not None:
    409             peak = np.maximum(peak, account_value)
    410             floor_value = peak*(1-drawdown)
    411         cushion = (account_value - floor_value)/account_value
    412         risky_w = m*cushion
    413         risky_w = np.minimum(risky_w, 1)
    414         risky_w = np.maximum(risky_w, 0)
    415         safe_w = 1-risky_w
    416         risky_alloc = account_value*risky_w
    417         safe_alloc = account_value*safe_w
    418         # recompute the new account value at the end of this step
    419         account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
    420         # save the histories for analysis and plotting
    421         cushion_history.iloc[step] = cushion
    422         risky_w_history.iloc[step] = risky_w
    423         account_history.iloc[step] = account_value
    424         floorval_history.iloc[step] = floor_value
    425         peak_history.iloc[step] = peak
    426     risky_wealth = start*(1+risky_r).cumprod()
    427     backtest_result = {
    428         "Wealth": account_history,
    429         "Risky Wealth": risky_wealth, 
    430         "Risk Budget": cushion_history,
    431         "Risky Allocation": risky_w_history,
    432         "m": m,
    433         "start": start,
    434         "floor": floor,
    435         "risky_r":risky_r,
    436         "safe_r": safe_r,
    437         "drawdown": drawdown,
    438         "peak": peak_history,
    439         "floor": floorval_history
    440     }
    441     return backtest_result
    442 
    443 
    444 def summary_stats(r, riskfree_rate=0.03):
    445     """
    446     Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
    447     """
    448     ann_r = r.aggregate(annualize_rets, periods_per_year=12)
    449     ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
    450     ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
    451     dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
    452     skew = r.aggregate(skewness)
    453     kurt = r.aggregate(kurtosis)
    454     cf_var5 = r.aggregate(var_gaussian, modified=True)
    455     hist_cvar5 = r.aggregate(cvar_historic)
    456     return pd.DataFrame({
    457         "Annualized Return": ann_r,
    458         "Annualized Vol": ann_vol,
    459         "Skewness": skew,
    460         "Kurtosis": kurt,
    461         "Cornish-Fisher VaR (5%)": cf_var5,
    462         "Historic CVaR (5%)": hist_cvar5,
    463         "Sharpe Ratio": ann_sr,
    464         "Max Drawdown": dd
    465     })
    466 
    467                          
    468 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
    469     """
    470     Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
    471     :param n_years:  The number of years to generate data for
    472     :param n_paths: The number of scenarios/trajectories
    473     :param mu: Annualized Drift, e.g. Market Return
    474     :param sigma: Annualized Volatility
    475     :param steps_per_year: granularity of the simulation
    476     :param s_0: initial value
    477     :return: a numpy array of n_paths columns and n_years*steps_per_year rows
    478     """
    479     # Derive per-step Model Parameters from User Specifications
    480     dt = 1/steps_per_year
    481     n_steps = int(n_years*steps_per_year) + 1
    482     # the standard way ...
    483     # rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
    484     # without discretization error ...
    485     rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
    486     rets_plus_1[0] = 1
    487     ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
    488     return ret_val
    489 
    490 def discount(t, r):
    491     """
    492     Compute the price of a pure discount bond that pays a dollar at time t where t is in years and r is the annual interest rate
    493     """
    494     return (1+r)**(-t)
    495 
    496 def pv(l, r):
    497     """
    498     Compute the present value of a list of liabilities given by the time (as an index) and amounts
    499     """
    500     dates = l.index
    501     discounts = discount(dates, r)
    502     return (discounts*l).sum()
    503 
    504 def funding_ratio(assets, liabilities, r):
    505     """
    506     Computes the funding ratio of a series of liabilities, based on an interest rate and current value of assets
    507     """
    508     return assets/pv(liabilities, r)