ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

edhec_risk_kit_201.py

(17806B)


      1 import pandas as pd
      2 import numpy as np
      3 
      4 def get_ffme_returns():
      5     """
      6     Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
      7     """
      8     me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
      9                        header=0, index_col=0, na_values=-99.99)
     10     rets = me_m[['Lo 10', 'Hi 10']]
     11     rets.columns = ['SmallCap', 'LargeCap']
     12     rets = rets/100
     13     rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
     14     return rets
     15 
     16 def get_fff_returns():
     17     """
     18     Load the Fama-French Research Factor Monthly Dataset
     19     """
     20     rets = pd.read_csv("data/F-F_Research_Data_Factors_m.csv",
     21                        header=0, index_col=0, na_values=-99.99)/100
     22     rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
     23     return rets
     24 
     25 
     26 def get_hfi_returns():
     27     """
     28     Load and format the EDHEC Hedge Fund Index Returns
     29     """
     30     hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
     31                       header=0, index_col=0, parse_dates=True)
     32     hfi = hfi/100
     33     hfi.index = hfi.index.to_period('M')
     34     return hfi
     35 
     36 def get_ind_file(filetype):
     37     """
     38     Load and format the Ken French 30 Industry Portfolios files
     39     """
     40     known_types = ["returns", "nfirms", "size"]
     41     if filetype not in known_types:
     42         raise ValueError(f"filetype must be one of:{','.join(known_types)}")
     43     if filetype is "returns":
     44         name = "vw_rets"
     45         divisor = 100
     46     elif filetype is "nfirms":
     47         name = "nfirms"
     48         divisor = 1
     49     elif filetype is "size":
     50         name = "size"
     51         divisor = 1
     52                          
     53     ind = pd.read_csv(f"data/ind30_m_{name}.csv", header=0, index_col=0)/divisor
     54     ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
     55     ind.columns = ind.columns.str.strip()
     56     return ind
     57 
     58 def get_ind_returns():
     59     """
     60     Load and format the Ken French 30 Industry Portfolios Value Weighted Monthly Returns
     61     """
     62     return get_ind_file("returns")
     63 
     64 def get_ind_nfirms():
     65     """
     66     Load and format the Ken French 30 Industry Portfolios Average number of Firms
     67     """
     68     return get_ind_file("nfirms")
     69 
     70 def get_ind_size():
     71     """
     72     Load and format the Ken French 30 Industry Portfolios Average size (market cap)
     73     """
     74     return get_ind_file("size")
     75 
     76                          
     77 def get_total_market_index_returns():
     78     """
     79     Load the 30 industry portfolio data and derive the returns of a capweighted total market index
     80     """
     81     ind_nfirms = get_ind_nfirms()
     82     ind_size = get_ind_size()
     83     ind_return = get_ind_returns()
     84     ind_mktcap = ind_nfirms * ind_size
     85     total_mktcap = ind_mktcap.sum(axis=1)
     86     ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
     87     total_market_return = (ind_capweight * ind_return).sum(axis="columns")
     88     return total_market_return
     89                          
     90 def skewness(r):
     91     """
     92     Alternative to scipy.stats.skew()
     93     Computes the skewness of the supplied Series or DataFrame
     94     Returns a float or a Series
     95     """
     96     demeaned_r = r - r.mean()
     97     # use the population standard deviation, so set dof=0
     98     sigma_r = r.std(ddof=0)
     99     exp = (demeaned_r**3).mean()
    100     return exp/sigma_r**3
    101 
    102 
    103 def kurtosis(r):
    104     """
    105     Alternative to scipy.stats.kurtosis()
    106     Computes the kurtosis of the supplied Series or DataFrame
    107     Returns a float or a Series
    108     """
    109     demeaned_r = r - r.mean()
    110     # use the population standard deviation, so set dof=0
    111     sigma_r = r.std(ddof=0)
    112     exp = (demeaned_r**4).mean()
    113     return exp/sigma_r**4
    114 
    115 
    116 def compound(r):
    117     """
    118     returns the result of compounding the set of returns in r
    119     """
    120     return np.expm1(np.log1p(r).sum())
    121 
    122                          
    123 def annualize_rets(r, periods_per_year):
    124     """
    125     Annualizes a set of returns
    126     We should infer the periods per year
    127     but that is currently left as an exercise
    128     to the reader :-)
    129     """
    130     compounded_growth = (1+r).prod()
    131     n_periods = r.shape[0]
    132     return compounded_growth**(periods_per_year/n_periods)-1
    133 
    134 
    135 def annualize_vol(r, periods_per_year):
    136     """
    137     Annualizes the vol of a set of returns
    138     We should infer the periods per year
    139     but that is currently left as an exercise
    140     to the reader :-)
    141     """
    142     return r.std()*(periods_per_year**0.5)
    143 
    144 
    145 def sharpe_ratio(r, riskfree_rate, periods_per_year):
    146     """
    147     Computes the annualized sharpe ratio of a set of returns
    148     """
    149     # convert the annual riskfree rate to per period
    150     rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
    151     excess_ret = r - rf_per_period
    152     ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
    153     ann_vol = annualize_vol(r, periods_per_year)
    154     return ann_ex_ret/ann_vol
    155 
    156 
    157 import scipy.stats
    158 def is_normal(r, level=0.01):
    159     """
    160     Applies the Jarque-Bera test to determine if a Series is normal or not
    161     Test is applied at the 1% level by default
    162     Returns True if the hypothesis of normality is accepted, False otherwise
    163     """
    164     if isinstance(r, pd.DataFrame):
    165         return r.aggregate(is_normal)
    166     else:
    167         statistic, p_value = scipy.stats.jarque_bera(r)
    168         return p_value > level
    169 
    170 
    171 def drawdown(return_series: pd.Series):
    172     """Takes a time series of asset returns.
    173        returns a DataFrame with columns for
    174        the wealth index, 
    175        the previous peaks, and 
    176        the percentage drawdown
    177     """
    178     wealth_index = 1000*(1+return_series).cumprod()
    179     previous_peaks = wealth_index.cummax()
    180     drawdowns = (wealth_index - previous_peaks)/previous_peaks
    181     return pd.DataFrame({"Wealth": wealth_index, 
    182                          "Previous Peak": previous_peaks, 
    183                          "Drawdown": drawdowns})
    184 
    185 
    186 def semideviation(r):
    187     """
    188     Returns the semideviation aka negative semideviation of r
    189     r must be a Series or a DataFrame, else raises a TypeError
    190     """
    191     if isinstance(r, pd.Series):
    192         is_negative = r < 0
    193         return r[is_negative].std(ddof=0)
    194     elif isinstance(r, pd.DataFrame):
    195         return r.aggregate(semideviation)
    196     else:
    197         raise TypeError("Expected r to be a Series or DataFrame")
    198 
    199 
    200 def var_historic(r, level=5):
    201     """
    202     Returns the historic Value at Risk at a specified level
    203     i.e. returns the number such that "level" percent of the returns
    204     fall below that number, and the (100-level) percent are above
    205     """
    206     if isinstance(r, pd.DataFrame):
    207         return r.aggregate(var_historic, level=level)
    208     elif isinstance(r, pd.Series):
    209         return -np.percentile(r, level)
    210     else:
    211         raise TypeError("Expected r to be a Series or DataFrame")
    212 
    213 
    214 def cvar_historic(r, level=5):
    215     """
    216     Computes the Conditional VaR of Series or DataFrame
    217     """
    218     if isinstance(r, pd.Series):
    219         is_beyond = r <= var_historic(r, level=level)
    220         return -r[is_beyond].mean()
    221     elif isinstance(r, pd.DataFrame):
    222         return r.aggregate(cvar_historic, level=level)
    223     else:
    224         raise TypeError("Expected r to be a Series or DataFrame")
    225 
    226 
    227 from scipy.stats import norm
    228 def var_gaussian(r, level=5, modified=False):
    229     """
    230     Returns the Parametric Gauusian VaR of a Series or DataFrame
    231     If "modified" is True, then the modified VaR is returned,
    232     using the Cornish-Fisher modification
    233     """
    234     # compute the Z score assuming it was Gaussian
    235     z = norm.ppf(level/100)
    236     if modified:
    237         # modify the Z score based on observed skewness and kurtosis
    238         s = skewness(r)
    239         k = kurtosis(r)
    240         z = (z +
    241                 (z**2 - 1)*s/6 +
    242                 (z**3 -3*z)*(k-3)/24 -
    243                 (2*z**3 - 5*z)*(s**2)/36
    244             )
    245     return -(r.mean() + z*r.std(ddof=0))
    246 
    247 
    248 def portfolio_return(weights, returns):
    249     """
    250     Computes the return on a portfolio from constituent returns and weights
    251     weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
    252     """
    253     return weights.T @ returns
    254 
    255 
    256 def portfolio_vol(weights, covmat):
    257     """
    258     Computes the vol of a portfolio from a covariance matrix and constituent weights
    259     weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
    260     """
    261     return (weights.T @ covmat @ weights)**0.5
    262 
    263 
    264 def plot_ef2(n_points, er, cov):
    265     """
    266     Plots the 2-asset efficient frontier
    267     """
    268     if er.shape[0] != 2 or er.shape[0] != 2:
    269         raise ValueError("plot_ef2 can only plot 2-asset frontiers")
    270     weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
    271     rets = [portfolio_return(w, er) for w in weights]
    272     vols = [portfolio_vol(w, cov) for w in weights]
    273     ef = pd.DataFrame({
    274         "Returns": rets, 
    275         "Volatility": vols
    276     })
    277     return ef.plot.line(x="Volatility", y="Returns", style=".-")
    278 
    279 
    280 from scipy.optimize import minimize
    281 
    282 def minimize_vol(target_return, er, cov):
    283     """
    284     Returns the optimal weights that achieve the target return
    285     given a set of expected returns and a covariance matrix
    286     """
    287     n = er.shape[0]
    288     init_guess = np.repeat(1/n, n)
    289     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    290     # construct the constraints
    291     weights_sum_to_1 = {'type': 'eq',
    292                         'fun': lambda weights: np.sum(weights) - 1
    293     }
    294     return_is_target = {'type': 'eq',
    295                         'args': (er,),
    296                         'fun': lambda weights, er: target_return - portfolio_return(weights,er)
    297     }
    298     weights = minimize(portfolio_vol, init_guess,
    299                        args=(cov,), method='SLSQP',
    300                        options={'disp': False},
    301                        constraints=(weights_sum_to_1,return_is_target),
    302                        bounds=bounds)
    303     return weights.x
    304 
    305 
    306 def msr(riskfree_rate, er, cov):
    307     """
    308     Returns the weights of the portfolio that gives you the maximum sharpe ratio
    309     given the riskfree rate and expected returns and a covariance matrix
    310     """
    311     n = er.shape[0]
    312     init_guess = np.repeat(1/n, n)
    313     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    314     # construct the constraints
    315     weights_sum_to_1 = {'type': 'eq',
    316                         'fun': lambda weights: np.sum(weights) - 1
    317     }
    318     def neg_sharpe(weights, riskfree_rate, er, cov):
    319         """
    320         Returns the negative of the sharpe ratio
    321         of the given portfolio
    322         """
    323         r = portfolio_return(weights, er)
    324         vol = portfolio_vol(weights, cov)
    325         return -(r - riskfree_rate)/vol
    326     
    327     weights = minimize(neg_sharpe, init_guess,
    328                        args=(riskfree_rate, er, cov), method='SLSQP',
    329                        options={'disp': False},
    330                        constraints=(weights_sum_to_1,),
    331                        bounds=bounds)
    332     return weights.x
    333 
    334 
    335 def gmv(cov):
    336     """
    337     Returns the weights of the Global Minimum Volatility portfolio
    338     given a covariance matrix
    339     """
    340     n = cov.shape[0]
    341     return msr(0, np.repeat(1, n), cov)
    342 
    343 
    344 def optimal_weights(n_points, er, cov):
    345     """
    346     Returns a list of weights that represent a grid of n_points on the efficient frontier
    347     """
    348     target_rs = np.linspace(er.min(), er.max(), n_points)
    349     weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
    350     return weights
    351 
    352 
    353 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
    354     """
    355     Plots the multi-asset efficient frontier
    356     """
    357     weights = optimal_weights(n_points, er, cov)
    358     rets = [portfolio_return(w, er) for w in weights]
    359     vols = [portfolio_vol(w, cov) for w in weights]
    360     ef = pd.DataFrame({
    361         "Returns": rets, 
    362         "Volatility": vols
    363     })
    364     ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
    365     if show_cml:
    366         ax.set_xlim(left = 0)
    367         # get MSR
    368         w_msr = msr(riskfree_rate, er, cov)
    369         r_msr = portfolio_return(w_msr, er)
    370         vol_msr = portfolio_vol(w_msr, cov)
    371         # add CML
    372         cml_x = [0, vol_msr]
    373         cml_y = [riskfree_rate, r_msr]
    374         ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
    375     if show_ew:
    376         n = er.shape[0]
    377         w_ew = np.repeat(1/n, n)
    378         r_ew = portfolio_return(w_ew, er)
    379         vol_ew = portfolio_vol(w_ew, cov)
    380         # add EW
    381         ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
    382     if show_gmv:
    383         w_gmv = gmv(cov)
    384         r_gmv = portfolio_return(w_gmv, er)
    385         vol_gmv = portfolio_vol(w_gmv, cov)
    386         # add EW
    387         ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
    388         
    389         return ax
    390 
    391                          
    392 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
    393     """
    394     Run a backtest of the CPPI strategy, given a set of returns for the risky asset
    395     Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
    396     """
    397     # set up the CPPI parameters
    398     dates = risky_r.index
    399     n_steps = len(dates)
    400     account_value = start
    401     floor_value = start*floor
    402     peak = account_value
    403     if isinstance(risky_r, pd.Series): 
    404         risky_r = pd.DataFrame(risky_r, columns=["R"])
    405 
    406     if safe_r is None:
    407         safe_r = pd.DataFrame().reindex_like(risky_r)
    408         safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
    409     # set up some DataFrames for saving intermediate values
    410     account_history = pd.DataFrame().reindex_like(risky_r)
    411     risky_w_history = pd.DataFrame().reindex_like(risky_r)
    412     cushion_history = pd.DataFrame().reindex_like(risky_r)
    413     floorval_history = pd.DataFrame().reindex_like(risky_r)
    414     peak_history = pd.DataFrame().reindex_like(risky_r)
    415 
    416     for step in range(n_steps):
    417         if drawdown is not None:
    418             peak = np.maximum(peak, account_value)
    419             floor_value = peak*(1-drawdown)
    420         cushion = (account_value - floor_value)/account_value
    421         risky_w = m*cushion
    422         risky_w = np.minimum(risky_w, 1)
    423         risky_w = np.maximum(risky_w, 0)
    424         safe_w = 1-risky_w
    425         risky_alloc = account_value*risky_w
    426         safe_alloc = account_value*safe_w
    427         # recompute the new account value at the end of this step
    428         account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
    429         # save the histories for analysis and plotting
    430         cushion_history.iloc[step] = cushion
    431         risky_w_history.iloc[step] = risky_w
    432         account_history.iloc[step] = account_value
    433         floorval_history.iloc[step] = floor_value
    434         peak_history.iloc[step] = peak
    435     risky_wealth = start*(1+risky_r).cumprod()
    436     backtest_result = {
    437         "Wealth": account_history,
    438         "Risky Wealth": risky_wealth, 
    439         "Risk Budget": cushion_history,
    440         "Risky Allocation": risky_w_history,
    441         "m": m,
    442         "start": start,
    443         "floor": floor,
    444         "risky_r":risky_r,
    445         "safe_r": safe_r,
    446         "drawdown": drawdown,
    447         "peak": peak_history,
    448         "floor": floorval_history
    449     }
    450     return backtest_result
    451 
    452 
    453 def summary_stats(r, riskfree_rate=0.03):
    454     """
    455     Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
    456     """
    457     ann_r = r.aggregate(annualize_rets, periods_per_year=12)
    458     ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
    459     ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
    460     dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
    461     skew = r.aggregate(skewness)
    462     kurt = r.aggregate(kurtosis)
    463     cf_var5 = r.aggregate(var_gaussian, modified=True)
    464     hist_cvar5 = r.aggregate(cvar_historic)
    465     return pd.DataFrame({
    466         "Annualized Return": ann_r,
    467         "Annualized Vol": ann_vol,
    468         "Skewness": skew,
    469         "Kurtosis": kurt,
    470         "Cornish-Fisher VaR (5%)": cf_var5,
    471         "Historic CVaR (5%)": hist_cvar5,
    472         "Sharpe Ratio": ann_sr,
    473         "Max Drawdown": dd
    474     })
    475 
    476                          
    477 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
    478     """
    479     Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
    480     :param n_years:  The number of years to generate data for
    481     :param n_paths: The number of scenarios/trajectories
    482     :param mu: Annualized Drift, e.g. Market Return
    483     :param sigma: Annualized Volatility
    484     :param steps_per_year: granularity of the simulation
    485     :param s_0: initial value
    486     :return: a numpy array of n_paths columns and n_years*steps_per_year rows
    487     """
    488     # Derive per-step Model Parameters from User Specifications
    489     dt = 1/steps_per_year
    490     n_steps = int(n_years*steps_per_year) + 1
    491     # the standard way ...
    492     # rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
    493     # without discretization error ...
    494     rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
    495     rets_plus_1[0] = 1
    496     ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
    497     return ret_val
    498 
    499                          
    500 import statsmodels.api as sm
    501 def regress(dependent_variable, explanatory_variables, alpha=True):
    502     """
    503     Runs a linear regression to decompose the dependent variable into the explanatory variables
    504     returns an object of type statsmodel's RegressionResults on which you can call
    505        .summary() to print a full summary
    506        .params for the coefficients
    507        .tvalues and .pvalues for the significance levels
    508        .rsquared_adj and .rsquared for quality of fit
    509     """
    510     if alpha:
    511         explanatory_variables = explanatory_variables.copy()
    512         explanatory_variables["Alpha"] = 1
    513     
    514     lm = sm.OLS(dependent_variable, explanatory_variables).fit()
    515     return lm