看到社群大佬对这个策略已经有很好的诠释了,看了回测数据也是真的厉害,出于兴趣,就自己做了一下分析,感觉抓上涨很有效,对于下跌判断没有上涨敏感,但对规避大熊市感觉还是可以的。
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
security = '000001.XSHG'
prices = get_price(security, '2005-01-05', '2019-01-01', '1d', ['high', 'low', 'open'])
long = 18 #beta calculation period
def caculate_beta(long, prices):
beta_list = [] #beta list
R_list = [] #r squared list
for i in range(long,len(prices)):
X = prices.low[i-long:i].reshape(-1,1)
y = prices.high[i-long:i].reshape(-1,1)
reg = LinearRegression().fit(X,y)
beta_list.append(reg.coef_[0][0])
R_list.append(reg.score(X,y))
R_list = np.asarray(R_list).reshape(-1,1)
beta_list = np.asarray(beta_list).reshape(-1,1)
#standardize beta
scaler = StandardScaler()
beta_std = scaler.fit_transform(beta_list)
beta_rightdev = R_list*beta_list*beta_std
return (beta_list,beta_std,beta_rightdev)
beta_list, beta_std, beta_rightdev = caculate_beta(long,prices)
sns.distplot(beta_std)
plt.show()
sns.distplot(beta_rightdev)
plt.show()
sns.distplot(beta_rightdev,hist_kws={'cumulative': True}, kde_kws={'cumulative': True})
plt.show()
hold_period = [10,30,60,90]
def calculate_return_df(price,hold_period):
#price has to be a series
origional_name = price.name
#turn price series to df
price = pd.DataFrame(price)
for i in hold_period:
return_arr = price[origional_name]/price[origional_name].shift(i)-1
return_arr = return_arr.dropna().values
for y in range(i):
return_arr = np.append(return_arr,np.nan)
price[str(y+1)+'_days_return'] = return_arr
return price
return_df = calculate_return_df(prices.open,hold_period)
return_df.head()
for _ in range(long):
beta_list = np.append(np.nan,beta_list.reshape(1,-1)[0])
for _ in range(long):
beta_std = np.append(np.nan,beta_std.reshape(1,-1)[0])
for _ in range(long):
beta_rightdev = np.append(np.nan,beta_rightdev.reshape(1,-1)[0])
return_df['beta'] = beta_list
return_df['beta_standardized'] = beta_std
return_df['beta_rightdev'] = beta_rightdev
return_df.head()
return_df = return_df.dropna()
return_df.head()
y_list = ['10_days_return','30_days_return','60_days_return','90_days_return']
X_list = ['beta','beta_standardized','beta_rightdev']
fig , axes = plt.subplots(nrows=3,ncols=4,figsize=(30, 20))
plt.figure(20)
for x in range(1,len(X_list)+1):
for y in range(1,len(y_list)+1):
axes[x-1][y-1].scatter(return_df[X_list[x-1]],return_df[y_list[y-1]])
plt.show()
def plot_df(return_df,variable):
analysis_df = return_df.loc[:,[variable,'10_days_return','30_days_return','60_days_return','90_days_return']]
group_df = analysis_df.groupby(pd.cut(analysis_df[variable],20)).mean().drop([variable],axis=1)
group_df.plot()
plt.show()
plot_df(return_df,'beta')
plot_df(return_df,'beta_standardized')
plot_df(return_df,'beta_rightdev')
return_df.head()
df = return_df[return_df.beta_rightdev<-0.9].loc[:,y_list]>0
df.sum()/len(df)
hold_period = [0.7,1,1.5]
def bet_probability(hold_period, sign):
if sign > 0:
for i in hold_period:
df = return_df[return_df.beta_rightdev>i].loc[:,y_list]>0
print df.sum()/len(df)
print ""
print 'sample size is '+str(len(df))
print ""
print ">>>>>>>>>>>>>>>>>"
else:
for i in hold_period:
df = return_df[return_df.beta_rightdev<i*-1].loc[:,y_list]>0
print df.sum()/len(df)
print ""
print 'sample size is '+str(len(df))
print ""
print ">>>>>>>>>>>>>>>>>"
bet_probability(hold_period,1)
hold_period = [0.7,0.9,1.1]
bet_probability(hold_period,-1)