ml-finance-python

python scripts for finance machine learning

git clone https://9o.is/git/ml-finance-python.git

edhec_risk_kit_203.py

(19208B)


      1 import pandas as pd
      2 import numpy as np
      3 
      4 def get_ffme_returns():
      5     """
      6     Load the Fama-French Dataset for the returns of the Top and Bottom Deciles by MarketCap
      7     """
      8     me_m = pd.read_csv("data/Portfolios_Formed_on_ME_monthly_EW.csv",
      9                        header=0, index_col=0, na_values=-99.99)
     10     rets = me_m[['Lo 10', 'Hi 10']]
     11     rets.columns = ['SmallCap', 'LargeCap']
     12     rets = rets/100
     13     rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
     14     return rets
     15 
     16 def get_fff_returns():
     17     """
     18     Load the Fama-French Research Factor Monthly Dataset
     19     """
     20     rets = pd.read_csv("data/F-F_Research_Data_Factors_m.csv",
     21                        header=0, index_col=0, na_values=-99.99)/100
     22     rets.index = pd.to_datetime(rets.index, format="%Y%m").to_period('M')
     23     return rets
     24 
     25 
     26 def get_hfi_returns():
     27     """
     28     Load and format the EDHEC Hedge Fund Index Returns
     29     """
     30     hfi = pd.read_csv("data/edhec-hedgefundindices.csv",
     31                       header=0, index_col=0, parse_dates=True)
     32     hfi = hfi/100
     33     hfi.index = hfi.index.to_period('M')
     34     return hfi
     35 
     36 def get_ind_file(filetype, ew=False):
     37     """
     38     Load and format the Ken French 30 Industry Portfolios files
     39     """
     40     known_types = ["returns", "nfirms", "size"]
     41     if filetype not in known_types:
     42         raise ValueError(f"filetype must be one of:{','.join(known_types)}")
     43     if filetype is "returns":
     44         name = "ew_rets" if ew else "vw_rets"
     45         divisor = 100
     46     elif filetype is "nfirms":
     47         name = "nfirms"
     48         divisor = 1
     49     elif filetype is "size":
     50         name = "size"
     51         divisor = 1
     52                          
     53     ind = pd.read_csv(f"data/ind30_m_{name}.csv", header=0, index_col=0)/divisor
     54     ind.index = pd.to_datetime(ind.index, format="%Y%m").to_period('M')
     55     ind.columns = ind.columns.str.strip()
     56     return ind
     57 
     58 def get_ind_returns(ew=False):
     59     """
     60     Load and format the Ken French 30 Industry Portfolios Value Weighted Monthly Returns
     61     """
     62     return get_ind_file("returns", ew=ew)
     63 
     64 def get_ind_nfirms():
     65     """
     66     Load and format the Ken French 30 Industry Portfolios Average number of Firms
     67     """
     68     return get_ind_file("nfirms")
     69 
     70 def get_ind_size():
     71     """
     72     Load and format the Ken French 30 Industry Portfolios Average size (market cap)
     73     """
     74     return get_ind_file("size")
     75 
     76                          
     77 def get_total_market_index_returns():
     78     """
     79     Load the 30 industry portfolio data and derive the returns of a capweighted total market index
     80     """
     81     ind_nfirms = get_ind_nfirms()
     82     ind_size = get_ind_size()
     83     ind_return = get_ind_returns()
     84     ind_mktcap = ind_nfirms * ind_size
     85     total_mktcap = ind_mktcap.sum(axis=1)
     86     ind_capweight = ind_mktcap.divide(total_mktcap, axis="rows")
     87     total_market_return = (ind_capweight * ind_return).sum(axis="columns")
     88     return total_market_return
     89                          
     90 def skewness(r):
     91     """
     92     Alternative to scipy.stats.skew()
     93     Computes the skewness of the supplied Series or DataFrame
     94     Returns a float or a Series
     95     """
     96     demeaned_r = r - r.mean()
     97     # use the population standard deviation, so set dof=0
     98     sigma_r = r.std(ddof=0)
     99     exp = (demeaned_r**3).mean()
    100     return exp/sigma_r**3
    101 
    102 
    103 def kurtosis(r):
    104     """
    105     Alternative to scipy.stats.kurtosis()
    106     Computes the kurtosis of the supplied Series or DataFrame
    107     Returns a float or a Series
    108     """
    109     demeaned_r = r - r.mean()
    110     # use the population standard deviation, so set dof=0
    111     sigma_r = r.std(ddof=0)
    112     exp = (demeaned_r**4).mean()
    113     return exp/sigma_r**4
    114 
    115 
    116 def compound(r):
    117     """
    118     returns the result of compounding the set of returns in r
    119     """
    120     return np.expm1(np.log1p(r).sum())
    121 
    122                          
    123 def annualize_rets(r, periods_per_year):
    124     """
    125     Annualizes a set of returns
    126     We should infer the periods per year
    127     but that is currently left as an exercise
    128     to the reader :-)
    129     """
    130     compounded_growth = (1+r).prod()
    131     n_periods = r.shape[0]
    132     return compounded_growth**(periods_per_year/n_periods)-1
    133 
    134 
    135 def annualize_vol(r, periods_per_year):
    136     """
    137     Annualizes the vol of a set of returns
    138     We should infer the periods per year
    139     but that is currently left as an exercise
    140     to the reader :-)
    141     """
    142     return r.std()*(periods_per_year**0.5)
    143 
    144 
    145 def sharpe_ratio(r, riskfree_rate, periods_per_year):
    146     """
    147     Computes the annualized sharpe ratio of a set of returns
    148     """
    149     # convert the annual riskfree rate to per period
    150     rf_per_period = (1+riskfree_rate)**(1/periods_per_year)-1
    151     excess_ret = r - rf_per_period
    152     ann_ex_ret = annualize_rets(excess_ret, periods_per_year)
    153     ann_vol = annualize_vol(r, periods_per_year)
    154     return ann_ex_ret/ann_vol
    155 
    156 
    157 import scipy.stats
    158 def is_normal(r, level=0.01):
    159     """
    160     Applies the Jarque-Bera test to determine if a Series is normal or not
    161     Test is applied at the 1% level by default
    162     Returns True if the hypothesis of normality is accepted, False otherwise
    163     """
    164     if isinstance(r, pd.DataFrame):
    165         return r.aggregate(is_normal)
    166     else:
    167         statistic, p_value = scipy.stats.jarque_bera(r)
    168         return p_value > level
    169 
    170 
    171 def drawdown(return_series: pd.Series):
    172     """Takes a time series of asset returns.
    173        returns a DataFrame with columns for
    174        the wealth index, 
    175        the previous peaks, and 
    176        the percentage drawdown
    177     """
    178     wealth_index = 1000*(1+return_series).cumprod()
    179     previous_peaks = wealth_index.cummax()
    180     drawdowns = (wealth_index - previous_peaks)/previous_peaks
    181     return pd.DataFrame({"Wealth": wealth_index, 
    182                          "Previous Peak": previous_peaks, 
    183                          "Drawdown": drawdowns})
    184 
    185 
    186 def semideviation(r):
    187     """
    188     Returns the semideviation aka negative semideviation of r
    189     r must be a Series or a DataFrame, else raises a TypeError
    190     """
    191     if isinstance(r, pd.Series):
    192         is_negative = r < 0
    193         return r[is_negative].std(ddof=0)
    194     elif isinstance(r, pd.DataFrame):
    195         return r.aggregate(semideviation)
    196     else:
    197         raise TypeError("Expected r to be a Series or DataFrame")
    198 
    199 
    200 def var_historic(r, level=5):
    201     """
    202     Returns the historic Value at Risk at a specified level
    203     i.e. returns the number such that "level" percent of the returns
    204     fall below that number, and the (100-level) percent are above
    205     """
    206     if isinstance(r, pd.DataFrame):
    207         return r.aggregate(var_historic, level=level)
    208     elif isinstance(r, pd.Series):
    209         return -np.percentile(r, level)
    210     else:
    211         raise TypeError("Expected r to be a Series or DataFrame")
    212 
    213 
    214 def cvar_historic(r, level=5):
    215     """
    216     Computes the Conditional VaR of Series or DataFrame
    217     """
    218     if isinstance(r, pd.Series):
    219         is_beyond = r <= var_historic(r, level=level)
    220         return -r[is_beyond].mean()
    221     elif isinstance(r, pd.DataFrame):
    222         return r.aggregate(cvar_historic, level=level)
    223     else:
    224         raise TypeError("Expected r to be a Series or DataFrame")
    225 
    226 
    227 from scipy.stats import norm
    228 def var_gaussian(r, level=5, modified=False):
    229     """
    230     Returns the Parametric Gauusian VaR of a Series or DataFrame
    231     If "modified" is True, then the modified VaR is returned,
    232     using the Cornish-Fisher modification
    233     """
    234     # compute the Z score assuming it was Gaussian
    235     z = norm.ppf(level/100)
    236     if modified:
    237         # modify the Z score based on observed skewness and kurtosis
    238         s = skewness(r)
    239         k = kurtosis(r)
    240         z = (z +
    241                 (z**2 - 1)*s/6 +
    242                 (z**3 -3*z)*(k-3)/24 -
    243                 (2*z**3 - 5*z)*(s**2)/36
    244             )
    245     return -(r.mean() + z*r.std(ddof=0))
    246 
    247 
    248 def portfolio_return(weights, returns):
    249     """
    250     Computes the return on a portfolio from constituent returns and weights
    251     weights are a numpy array or Nx1 matrix and returns are a numpy array or Nx1 matrix
    252     """
    253     return weights.T @ returns
    254 
    255 
    256 def portfolio_vol(weights, covmat):
    257     """
    258     Computes the vol of a portfolio from a covariance matrix and constituent weights
    259     weights are a numpy array or N x 1 maxtrix and covmat is an N x N matrix
    260     """
    261     return (weights.T @ covmat @ weights)**0.5
    262 
    263 
    264 def plot_ef2(n_points, er, cov):
    265     """
    266     Plots the 2-asset efficient frontier
    267     """
    268     if er.shape[0] != 2 or er.shape[0] != 2:
    269         raise ValueError("plot_ef2 can only plot 2-asset frontiers")
    270     weights = [np.array([w, 1-w]) for w in np.linspace(0, 1, n_points)]
    271     rets = [portfolio_return(w, er) for w in weights]
    272     vols = [portfolio_vol(w, cov) for w in weights]
    273     ef = pd.DataFrame({
    274         "Returns": rets, 
    275         "Volatility": vols
    276     })
    277     return ef.plot.line(x="Volatility", y="Returns", style=".-")
    278 
    279 
    280 from scipy.optimize import minimize
    281 
    282 def minimize_vol(target_return, er, cov):
    283     """
    284     Returns the optimal weights that achieve the target return
    285     given a set of expected returns and a covariance matrix
    286     """
    287     n = er.shape[0]
    288     init_guess = np.repeat(1/n, n)
    289     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    290     # construct the constraints
    291     weights_sum_to_1 = {'type': 'eq',
    292                         'fun': lambda weights: np.sum(weights) - 1
    293     }
    294     return_is_target = {'type': 'eq',
    295                         'args': (er,),
    296                         'fun': lambda weights, er: target_return - portfolio_return(weights,er)
    297     }
    298     weights = minimize(portfolio_vol, init_guess,
    299                        args=(cov,), method='SLSQP',
    300                        options={'disp': False},
    301                        constraints=(weights_sum_to_1,return_is_target),
    302                        bounds=bounds)
    303     return weights.x
    304 
    305 
    306 def tracking_error(r_a, r_b):
    307     """
    308     Returns the Tracking Error between the two return series
    309     """
    310     return np.sqrt(((r_a - r_b)**2).sum())
    311 
    312                          
    313 def msr(riskfree_rate, er, cov):
    314     """
    315     Returns the weights of the portfolio that gives you the maximum sharpe ratio
    316     given the riskfree rate and expected returns and a covariance matrix
    317     """
    318     n = er.shape[0]
    319     init_guess = np.repeat(1/n, n)
    320     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    321     # construct the constraints
    322     weights_sum_to_1 = {'type': 'eq',
    323                         'fun': lambda weights: np.sum(weights) - 1
    324     }
    325     def neg_sharpe(weights, riskfree_rate, er, cov):
    326         """
    327         Returns the negative of the sharpe ratio
    328         of the given portfolio
    329         """
    330         r = portfolio_return(weights, er)
    331         vol = portfolio_vol(weights, cov)
    332         return -(r - riskfree_rate)/vol
    333     
    334     weights = minimize(neg_sharpe, init_guess,
    335                        args=(riskfree_rate, er, cov), method='SLSQP',
    336                        options={'disp': False},
    337                        constraints=(weights_sum_to_1,),
    338                        bounds=bounds)
    339     return weights.x
    340 
    341 
    342 def gmv(cov):
    343     """
    344     Returns the weights of the Global Minimum Volatility portfolio
    345     given a covariance matrix
    346     """
    347     n = cov.shape[0]
    348     return msr(0, np.repeat(1, n), cov)
    349 
    350 
    351 def optimal_weights(n_points, er, cov):
    352     """
    353     Returns a list of weights that represent a grid of n_points on the efficient frontier
    354     """
    355     target_rs = np.linspace(er.min(), er.max(), n_points)
    356     weights = [minimize_vol(target_return, er, cov) for target_return in target_rs]
    357     return weights
    358 
    359 
    360 def plot_ef(n_points, er, cov, style='.-', legend=False, show_cml=False, riskfree_rate=0, show_ew=False, show_gmv=False):
    361     """
    362     Plots the multi-asset efficient frontier
    363     """
    364     weights = optimal_weights(n_points, er, cov)
    365     rets = [portfolio_return(w, er) for w in weights]
    366     vols = [portfolio_vol(w, cov) for w in weights]
    367     ef = pd.DataFrame({
    368         "Returns": rets, 
    369         "Volatility": vols
    370     })
    371     ax = ef.plot.line(x="Volatility", y="Returns", style=style, legend=legend)
    372     if show_cml:
    373         ax.set_xlim(left = 0)
    374         # get MSR
    375         w_msr = msr(riskfree_rate, er, cov)
    376         r_msr = portfolio_return(w_msr, er)
    377         vol_msr = portfolio_vol(w_msr, cov)
    378         # add CML
    379         cml_x = [0, vol_msr]
    380         cml_y = [riskfree_rate, r_msr]
    381         ax.plot(cml_x, cml_y, color='green', marker='o', linestyle='dashed', linewidth=2, markersize=10)
    382     if show_ew:
    383         n = er.shape[0]
    384         w_ew = np.repeat(1/n, n)
    385         r_ew = portfolio_return(w_ew, er)
    386         vol_ew = portfolio_vol(w_ew, cov)
    387         # add EW
    388         ax.plot([vol_ew], [r_ew], color='goldenrod', marker='o', markersize=10)
    389     if show_gmv:
    390         w_gmv = gmv(cov)
    391         r_gmv = portfolio_return(w_gmv, er)
    392         vol_gmv = portfolio_vol(w_gmv, cov)
    393         # add EW
    394         ax.plot([vol_gmv], [r_gmv], color='midnightblue', marker='o', markersize=10)
    395         
    396         return ax
    397 
    398                          
    399 def run_cppi(risky_r, safe_r=None, m=3, start=1000, floor=0.8, riskfree_rate=0.03, drawdown=None):
    400     """
    401     Run a backtest of the CPPI strategy, given a set of returns for the risky asset
    402     Returns a dictionary containing: Asset Value History, Risk Budget History, Risky Weight History
    403     """
    404     # set up the CPPI parameters
    405     dates = risky_r.index
    406     n_steps = len(dates)
    407     account_value = start
    408     floor_value = start*floor
    409     peak = account_value
    410     if isinstance(risky_r, pd.Series): 
    411         risky_r = pd.DataFrame(risky_r, columns=["R"])
    412 
    413     if safe_r is None:
    414         safe_r = pd.DataFrame().reindex_like(risky_r)
    415         safe_r.values[:] = riskfree_rate/12 # fast way to set all values to a number
    416     # set up some DataFrames for saving intermediate values
    417     account_history = pd.DataFrame().reindex_like(risky_r)
    418     risky_w_history = pd.DataFrame().reindex_like(risky_r)
    419     cushion_history = pd.DataFrame().reindex_like(risky_r)
    420     floorval_history = pd.DataFrame().reindex_like(risky_r)
    421     peak_history = pd.DataFrame().reindex_like(risky_r)
    422 
    423     for step in range(n_steps):
    424         if drawdown is not None:
    425             peak = np.maximum(peak, account_value)
    426             floor_value = peak*(1-drawdown)
    427         cushion = (account_value - floor_value)/account_value
    428         risky_w = m*cushion
    429         risky_w = np.minimum(risky_w, 1)
    430         risky_w = np.maximum(risky_w, 0)
    431         safe_w = 1-risky_w
    432         risky_alloc = account_value*risky_w
    433         safe_alloc = account_value*safe_w
    434         # recompute the new account value at the end of this step
    435         account_value = risky_alloc*(1+risky_r.iloc[step]) + safe_alloc*(1+safe_r.iloc[step])
    436         # save the histories for analysis and plotting
    437         cushion_history.iloc[step] = cushion
    438         risky_w_history.iloc[step] = risky_w
    439         account_history.iloc[step] = account_value
    440         floorval_history.iloc[step] = floor_value
    441         peak_history.iloc[step] = peak
    442     risky_wealth = start*(1+risky_r).cumprod()
    443     backtest_result = {
    444         "Wealth": account_history,
    445         "Risky Wealth": risky_wealth, 
    446         "Risk Budget": cushion_history,
    447         "Risky Allocation": risky_w_history,
    448         "m": m,
    449         "start": start,
    450         "floor": floor,
    451         "risky_r":risky_r,
    452         "safe_r": safe_r,
    453         "drawdown": drawdown,
    454         "peak": peak_history,
    455         "floor": floorval_history
    456     }
    457     return backtest_result
    458 
    459 
    460 def summary_stats(r, riskfree_rate=0.03):
    461     """
    462     Return a DataFrame that contains aggregated summary stats for the returns in the columns of r
    463     """
    464     ann_r = r.aggregate(annualize_rets, periods_per_year=12)
    465     ann_vol = r.aggregate(annualize_vol, periods_per_year=12)
    466     ann_sr = r.aggregate(sharpe_ratio, riskfree_rate=riskfree_rate, periods_per_year=12)
    467     dd = r.aggregate(lambda r: drawdown(r).Drawdown.min())
    468     skew = r.aggregate(skewness)
    469     kurt = r.aggregate(kurtosis)
    470     cf_var5 = r.aggregate(var_gaussian, modified=True)
    471     hist_cvar5 = r.aggregate(cvar_historic)
    472     return pd.DataFrame({
    473         "Annualized Return": ann_r,
    474         "Annualized Vol": ann_vol,
    475         "Skewness": skew,
    476         "Kurtosis": kurt,
    477         "Cornish-Fisher VaR (5%)": cf_var5,
    478         "Historic CVaR (5%)": hist_cvar5,
    479         "Sharpe Ratio": ann_sr,
    480         "Max Drawdown": dd
    481     })
    482 
    483                          
    484 def gbm(n_years = 10, n_scenarios=1000, mu=0.07, sigma=0.15, steps_per_year=12, s_0=100.0, prices=True):
    485     """
    486     Evolution of Geometric Brownian Motion trajectories, such as for Stock Prices through Monte Carlo
    487     :param n_years:  The number of years to generate data for
    488     :param n_paths: The number of scenarios/trajectories
    489     :param mu: Annualized Drift, e.g. Market Return
    490     :param sigma: Annualized Volatility
    491     :param steps_per_year: granularity of the simulation
    492     :param s_0: initial value
    493     :return: a numpy array of n_paths columns and n_years*steps_per_year rows
    494     """
    495     # Derive per-step Model Parameters from User Specifications
    496     dt = 1/steps_per_year
    497     n_steps = int(n_years*steps_per_year) + 1
    498     # the standard way ...
    499     # rets_plus_1 = np.random.normal(loc=mu*dt+1, scale=sigma*np.sqrt(dt), size=(n_steps, n_scenarios))
    500     # without discretization error ...
    501     rets_plus_1 = np.random.normal(loc=(1+mu)**dt, scale=(sigma*np.sqrt(dt)), size=(n_steps, n_scenarios))
    502     rets_plus_1[0] = 1
    503     ret_val = s_0*pd.DataFrame(rets_plus_1).cumprod() if prices else rets_plus_1-1
    504     return ret_val
    505 
    506                          
    507 import statsmodels.api as sm
    508 def regress(dependent_variable, explanatory_variables, alpha=True):
    509     """
    510     Runs a linear regression to decompose the dependent variable into the explanatory variables
    511     returns an object of type statsmodel's RegressionResults on which you can call
    512        .summary() to print a full summary
    513        .params for the coefficients
    514        .tvalues and .pvalues for the significance levels
    515        .rsquared_adj and .rsquared for quality of fit
    516     """
    517     if alpha:
    518         explanatory_variables = explanatory_variables.copy()
    519         explanatory_variables["Alpha"] = 1
    520     
    521     lm = sm.OLS(dependent_variable, explanatory_variables).fit()
    522     return lm
    523 
    524 def portfolio_tracking_error(weights, ref_r, bb_r):
    525     """
    526     returns the tracking error between the reference returns
    527     and a portfolio of building block returns held with given weights
    528     """
    529     return tracking_error(ref_r, (weights*bb_r).sum(axis=1))
    530                          
    531 def style_analysis(dependent_variable, explanatory_variables):
    532     """
    533     Returns the optimal weights that minimizes the Tracking error between
    534     a portfolio of the explanatory variables and the dependent variable
    535     """
    536     n = explanatory_variables.shape[1]
    537     init_guess = np.repeat(1/n, n)
    538     bounds = ((0.0, 1.0),) * n # an N-tuple of 2-tuples!
    539     # construct the constraints
    540     weights_sum_to_1 = {'type': 'eq',
    541                         'fun': lambda weights: np.sum(weights) - 1
    542     }
    543     solution = minimize(portfolio_tracking_error, init_guess,
    544                        args=(dependent_variable, explanatory_variables,), method='SLSQP',
    545                        options={'disp': False},
    546                        constraints=(weights_sum_to_1,),
    547                        bounds=bounds)
    548     weights = pd.Series(solution.x, index=explanatory_variables.columns)
    549     return weights
    550 
    551     
    552