前几天小编催我也写一个多因子策略,无奈我还没写到策略这一步,而且肚里没水很是尴尬啊 哈哈 想想每一阶段我差不多也要写一篇总结贴,因此就有了此文,此文的目的是实现了一个快速多因子测试框架,在这个框架下聚宽100多个因子池实现多因子回测不会超过30秒,哦哦 你可以尽情发挥你的想象力,一天做几千次测试验证,只要你有灵感!
因为本人数学理论方面并不擅长因子不在算法方面多做讨论,只着重于应用于实现。本文实现了简单打分法的因子组合测试,大家可以在这个框架下继续添加各种算法,很多人都知道细节是魔鬼,因为本文仅仅是随意选了因子库的前10个,所以回测结果并不理想,貌似仅仅是单调性好了一些,另外我也不想为这个演示贴花费太多精力,因此匆忙写就,细节部分的调整需要各位自行努力啦。
文中一些算法仅仅使用了最简单的方式,因为复杂算法实现在演示贴很困难 ,如需要附加多个算法库,如果用通用代码实现需要写太多代码,况且很多算法在论坛里都有,大家自行解决吧
1 相关系数计算,文中计算使用了简单的顺序相关系数,可改为加权相关系数更好。
2 因子贡献度, 文中只使用了因子1组和末组差来代替,更好的做法是,(1组-末组收益) / (市场1组-末组收益)
3 因子方向计算,文中使用因子累计超额方向来定义,可手工定义经济意义方向。
4 因子筛选排名只使用了因子贡献及因子ir ,大家可自行加入其他参数,如回撤,icp值,其他检验指标,夏普率等,可选几十项。
5 因子排名打分本文仅使用了等权累计,可选其他权重组合方式。
总之多因子策略相关细节很多很多,遍历一遍目录都需要半天别说全部写出来了,所以需要挨个测试实验体会。
本文数据部分需要大家自己解决。因为上传我也不知道上传到哪里。再说数据本身很简单没啥特殊的。
factors 是一个列表。
factors = [价格, 因子1, 因子2, 因子3。。。。]
最后提供了本文需要的全套矢量计算代码,基本涵盖了因子计算的需要,因为使用numpy方式写成,可能不太好理解。
所以如果很费劲可以自行使用习惯的方式替换本文的实现函数。
import time
import datetime
import jqdata
import datetime
from jqfactor import Factor,calc_factors,winsorize,standardlize
import pandas as pd
import statsmodels.api as sm
import scipy.stats as st
import pickle
pkl_file = open('My10.pkl', 'rb')
factors = pickle.load(pkl_file)
简单说明下数据¶
本次演示参与数据:¶
factors[0] : 价格数据, 300指数股. 09年1月4日开始,每周收盘价 (标准df格式)¶
factors[1:10] : 因子数据, 聚宽因子库前10个,周期同上 (标准数组格式)¶
=
factors[0]
print factors[1].shape
factors[1]
现在测试下本次演示用到的函数 (源代码在本文最后)¶
ford = pct_change(factors[0].values,-1,extre=True) # 计算前向收益率,同时去极值
deme = demeaned(ford,1) # 计算等权超额收益
facs,cuts = standard(factors[1],factors[0],q=5) # 因子标准化分组
mask = np.isfinite(ford) & np.isfinite(facs) # 收集收益率及因子中的有效值标记
h = bincount(cuts,1,ford)[:,0:-1]/bincount(cuts,1,mask)[:,0:-1] # 分组统计收益/组数量
p = bincount(cuts,1,deme)[:,0:-1]/bincount(cuts,1,mask)[:,0:-1] # 分组统计收益/组数量
# 绘图
plt.figure(figsize=(12,3))
_ = plt.plot(np.nancumprod(h + 1, 0) - 1) # 输出因子分层统计(分组收益)
plt.figure(figsize=(12,3))
_ = plt.plot(np.nancumprod(p + 1, 0) - 1) # 输出因子分层统计(超额收益)
这里为每个因子定义方向¶
fsign = []
ford = pct_change(factors[0].values,-1,extre=True) # 计算前向收益率,同时去极值
deme = demeaned(ford,1) # 收益率去均值
for f in factors[1:]:
# 这里处理因子对因子去极值标准化后分组,并返回标准化后的因子及分组表
facs,cuts = standard(f,factors[0],q=5) # 因子标准化分组
# 这里收集数据中有效值标记,在后面计算中用到
mask = np.isfinite(ford) & np.isfinite(facs) # 收集收益率及因子中的有效值标记
# 汇总横截面每个分组的总收益 / 每个分组中有效元素的总是, 取得组均收益
# [:,0:-1] 前面分组中将无效数据单分为一类,这样不需要单独标记无效值,
# 因此可以用int8类型,相对只需内存占用1/8,因此这里取值避开最后无效组
h = bincount(cuts,1,deme)[:,0:-1]/bincount(cuts,1,mask)[:,0:-1] # 分组统计收益/组数量
# 这里计算每个因子的分组总收益
m = np.nanprod(h + 1, 0) - 1
# 收集每个因子方向
fsign.append(m[0]<m[-1])
fsign
重定义方向后的因子分层统计¶
ford = pct_change(factors[0].values,-1,extre=True) # 计算前向收益率,同时去极值
for f,sign in zip(factors[1:],fsign):
# 分组时按经济意义确定方向(这里按fsign中的定义)
facs,cuts = standard(f,factors[0],asc=sign,q=5) # 因子标准化分组
mask = np.isfinite(ford) & np.isfinite(facs) # 收集收益率及因子中的有效值标记
h = bincount(cuts,1,ford)[:,0:-1]/bincount(cuts,1,mask)[:,0:-1] # 分组统计收益/组数量
plt.figure(figsize=(12,2))
_ = plt.plot(np.nancumprod(h + 1, 0) - 1) # 输出因子分层统计(分组收益)
因子合成之因子打分¶
ford = pct_change(factors[0].values,-1,extre=True) # 计算前向收益率,同时去极值
deme = demeaned(ford,1) # 收益率去均值
mask = np.isfinite(ford) # 收集市场收益有效
cutr = standard(ford,q=5)[-1] # 前向收益分层
fdsc = []; ficr = []
for f,sign in zip(factors[1:],fsign):
# 分组时按经济意义确定方向(这里按 fsign 中的定义)
facs,cuts = standard(f,factors[0],asc=sign,q=5) # 因子标准化分组
mask = np.isfinite(ford) & np.isfinite(facs) # 收集收益率及因子中的有效值标记
h = bincount(cuts,1,ford)[:,0:-1]/bincount(cuts,1,mask)[:,0:-1] # 分组统计收益/组数量
# 一组收益与末组收益差
disw = pd.rolling_mean(pd.DataFrame(h[:,0]-h[:,-1]),window=24,min_periods=1)
# 计算ic-ir
ic = pearsonr(facs,ford,1,rank=True,pct=1) # 计算因子与前向收益的截面相关
ic_mean = pd.rolling_mean(pd.DataFrame(ic),window=24,min_periods = 1)
ic_std = pd.rolling_std(pd.DataFrame(ic),window=24,min_periods = 1)
icir = ic_mean/ic_std
# 收集每个因子贡献及ir
fdsc.append(disw)
ficr.append(icir)
# 计算排名 (注意 asc 定义了排名方向, asc=1 表示 ir 值越大越好, 否则 = 0)
fdsc = argsort(np.c_[tuple(fdsc)],rank=1,asc=1,pct=1,fillv=0,dtype=int8)[-1]# 所有因子贡献排名
ficr = argsort(np.c_[tuple(ficr)],rank=1,asc=1,pct=1,fillv=0,dtype=int8)[-1]# 所有因子ir排名
# 简单加权组合
fank = (fdsc*0.5)+(ficr*0.5)
print fank.shape
测试合成因子¶
close = factors[0].values
farg = argsort(fank,axis=1)[:,-5:] # 对因子打分表排序,并返回最大的5个因子
scre = np.zeros(shape=close.shape) # 生成一个分值表
# ================== 这里使用选择的因子为个股打分 ====================
for fx in np.unique(farg): # 遍历所有用到的因子索引
rows = (farg==fx).any(1) # 取得因子有效行(该因子在哪些周期被选中)
if rows.any():
factor = factors[fx+1]
cutr = standard(factor[rows],close[rows],q=5,asc=fsign[fx])[-1] # 因子分层
cutr = cutr * fank[:,fx][rows][:,None]
# 合成新的打分因子
scre[rows] += cutr
# ==================================================================
# 合成因子不能直接用于回测(因为前天因子与昨天收益得到的结果差了2期,
# 因此合成因子需要延迟2期,避免未来函数)
scre = shift(scre,2,fillv=0) # 因子打分延后2期
# ========================下面显示合成因子 ========================
# w = 1 持股周期。模拟实际方式来测试因子有效性衰减.
deme = demeaned(ford,1) # 收益率去均值
cutr = standard(scre,ford,q=5,w=1)[-1] # 生成因子分层(返回标准化后的因子)
mask = np.isfinite(ford) & np.isfinite(scre)
# ...................
# 因子分组收益
rets = bincount(cutr,1,ford)[:,0:-1]/bincount(cutr,1,mask)[:,0:-1]
plt.figure(figsize=(12,3))
_ = plt.plot(np.nancumprod(rets+1, 0)-1)
# 因子超额收益
rets = bincount(cutr,1,deme)[:,0:-1]/bincount(cutr,1,mask)[:,0:-1]
plt.figure(figsize=(12,3))
_ = plt.plot(np.nancumprod(rets+1, 0)-1)
# 因子24期滚动收益
rets = pd.rolling_sum(pd.DataFrame(rets),window=24,min_periods = 1)
plt.figure(figsize=(12,3))
_ = plt.plot(rets)
from numpy.core.umath_tests import inner1d
def pearsonr(A,B,axis=1,rank=False,pct=False):
A, B = np.asarray(A), np.asarray(B); ndis = np.arange(A.ndim)
axis = 0 if axis is None else (ndis[axis] if axis<0 else axis)
if rank:
A = argsort(A,axis,rank=rank,pct=pct,dtype=float)[1]
B = argsort(B,axis,rank=rank,pct=pct,dtype=float)[1]
if pct:
A /= np.expand_dims(A.max(axis),axis)
B /= np.expand_dims(B.max(axis),axis)
Af = A-np.expand_dims(A.mean(axis),axis)
Bf = B-np.expand_dims(B.mean(axis),axis)
if axis == ndis[-1]: # -1轴有一倍加速
mult = inner1d(Af,Bf)
diff = np.sqrt(inner1d(Af,Af)*inner1d(Bf,Bf))
else:
mult = (Af*Bf).sum(axis)
diff = np.sqrt((Af**2).sum(axis)*(Bf**2).sum(axis))
return mult/diff
def bincount(self,axis=None,weights=None,minlength=None):
if axis is None: view = np.ravel(np.asarray(self))
else: view = np.asarray(self)
mask = np.isfinite(view)
if not weights is None:
mask &= np.isfinite(weights); weights = np.asarray(weights)[mask]
if view.ndim==1: out = np.bincount(view[mask],weights=weights,minlength = minlength)
else:
m = view.shape[1-axis]; n = (view[mask].max()+1)*np.arange(m)
if minlength is None: minlength = m*(view[mask].max().astype(int)+1)
if axis!=1: out = np.bincount((view+n)[mask].astype(int),weights=weights,minlength=minlength).reshape(m,-1).T
else: out = np.bincount((view+n[:,None])[mask].astype(int),weights=weights,minlength=minlength).reshape(m,-1)
return out
def demeaned(self,axis=0,func=np.nanmean):
''' 指定轴向去掉均值 '''
view = np.asarray(self)
with warnings.catch_warnings():
warnings.filterwarnings("ignore","Mean of empty slice")
if axis!=0:
return self-func(view,axis)[:,None]
else: return self-func(view,axis)
def shift(self,num,fillv=np.nan,axis=0):
''' 通用滚动函数
self : 1d/2d array
axis : 支持轴向选择 0 or 1
num : int 行或列轴方向移动
'''
result = np.empty_like(self)
axs = 1 if axis==0 else np.s_[0:]
if num > 0:
result[np.s_[:,:num][axs]] = fillv
result[np.s_[:,num:][axs]] = self[np.s_[:,:-num][axs]]
elif num < 0:
result[np.s_[:,num:][axs]] = fillv
result[np.s_[:,:num][axs]] = self[np.s_[:,-num:][axs]]
else: result[:] = self
return result
def pct_change(x,N=1,axis=0,extre=False):
with warnings.catch_warnings():
warnings.filterwarnings("ignore","invalid value encountered in (less|multiply|divide|greater)")
warnings.filterwarnings('ignore', r'All-NaN (slice|axis) encountered')
if N>0:
rets = x/shift(np.asarray(x),N,axis=axis)-1
elif N<0:
rets = shift(np.asarray(x),N,axis=axis)/x-1
if extre: # 去极值
rets = winsorize(rets,qrange=[0.05,0.93],inclusive=True,axis=axis)
return rets
def fcut(self,q,axis=-1,asc=False,dtype=np.int16):
''' 快速版本分位数分箱函数
------------------
self : 1d/2d 数组
axis : 0 or 1
q : 分箱层数
asc : 分箱顺序反向(支持输入信号表,分期对因子方向进行调整)
------------------
out: 与 pd.qcut 输出略有差异,另外Nan的值为>=q
'''
if asc:
view = np.asarray(self)
else: view = 0-np.asarray(self)
mask = np.isfinite(view)
cout = view.shape[axis]; step = (1.0/q)*cout # 计算分位平均间隔
inds = argsort(view,axis=axis,rank=True)[1] # 取得顺序号
caxs = np.expand_dims(mask.sum(axis),axis)-1.0 # 取得轴向最大数量
inds = inds/caxs*(view.shape[axis]-1) # 去除无效值影响
icut = (inds/step).astype(dtype)
icut[~mask]=q
return icut
def argsort(self,axis=-1,asc=True,rank=False,pct=False,kind=None,fillv=None,dtype=np.int16):
""" numpy - np.argsort 及扩展, 可扩展至多维。
Parameters
----------
self : 数组数据
axis : 轴向选择
asc : 顺序或者倒序
rank : 获取顺序排名(顺序号), 默认'mergesort' 为排序选择
pct : 百分比排名或(pct>1: 标准分排名,需指定 pct总数)
Returns:
----------
idx : 如果未选排名返回索引,同 np.argsort
res : 如果选择排名输出含排名
另: 如果存在无效值,无效位上的索引或排名不能确定. 需要另行处理 """
if asc: view = np.asarray(self)
else: view = -np.asarray(self)
if rank|(pct!=0):
rank = True; mask = np.isfinite(view)
if kind is None: kind = 'mergesort'
res = np.empty(view.shape, dtype=dtype)
I = np.ogrid[tuple(map(slice,view.shape))]
idx = view.argsort(axis=axis,kind=kind)
rng,I[axis]=I[axis],idx; res[I]=rng
if pct:
emk = expand_dims(mask.sum(axis),axis)-1.0
emk[emk==0] = .00000001; ran = res / emk
if pct>1: ran = np.rint(ran*(pct-1)).astype(dtype)
else: ran = res
if fillv!=None: ran[~mask] = fillv
else: ran[~mask] = pct
return idx.astype(dtype),ran
if kind is None: kind = 'quicksort'
return view.argsort(axis=axis,kind=kind).astype(dtype)
def standard(factor,other=None,q=5,w=1,asc=False,ac=True):
""" 因子标准化流程函数(输出与输入尺寸相同)
Parameters
----------
factor : 数组, 因子数据
other : 数组/列表/元组 其他可能参与的数据,用于收集无效值标记
q : 分位数分箱数
w : 周期间隔数,指定间隔内组合保持不变,近似实际操作
fast : 快速分位数计算,可选标准
Returns:
----------
输出:处理后因子,分组表
"""
mask = np.isfinite(factor)
if not other is None:
if isinstance(other,(list,tuple)):
for o in other: # 遍历可能参数收集无效值标记
mask &= np.isfinite(o)
else: mask &= np.isfinite(other)
dayx = np.arange(factor.shape[0]); rows = mask.any(1)
if w > 1:# =================== 因子跨周期计算,近似模拟操作 ===========================
equl,reps = np.unique(dayx[rows]//w,True,return_counts=True)[1:]
temp = factor[equl]; invd = mask[equl] # 跨周期分割
else: temp = factor[rows]; invd = mask[rows]
# =================== 标记无效元素,防止无效元素参与标准化及数字化计算 ===================
try: temp[~invd] = np.nan
except:temp = temp.astype(float); temp[~invd] = np.nan
# ============================== 因子标准化,数字化处理 ==============================
if ac:
temp = winsorize(temp, qrange=[0.05,0.93], inclusive=True,axis=1) # 因子去极值
temp = standardlize(temp,axis=1) # 因子标准化
cuts = fcut(temp,q,axis=1,asc=asc,dtype=np.uint8)
icut = np.empty(factor.shape,dtype=np.int8) # 生成空数组
ifac = np.empty(factor.shape,dtype=factor.dtype) # 生成空数组
if w > 1:
icut[rows] = np.asarray(cuts).repeat(reps,axis=0) # 跨周期复原
ifac[rows] = np.asarray(temp).repeat(reps,axis=0) # 跨周期复原
else: icut[rows] = cuts; ifac[rows] = temp
icut[~mask] = q; ifac[~mask] = np.nan # 超边界箱号
return ifac,icut
def hist(self,bins=33,ranges=None,facecolor=None,rwidth=0.8,normed=True,alpha=0.95):
fig = plt.figure(figsize=(6, 3))
order = (self,) if isinstance(self,np.ndarray) else self
for i,f in enumerate(order):
ht = np.asarray(f).ravel();mask = np.isfinite(ht)
if ranges is None:
ranges = tuple([np.min(ht[mask]),np.max(ht[mask])])
if facecolor is None:
plt.hist(ht[mask],bins=bins,range=ranges,histtype='bar',rwidth=rwidth,normed=normed,alpha=alpha)
else:
plt.hist(ht[mask],bins=bins,range=ranges,facecolor=facecolor,histtype='bar',rwidth=rwidth,normed=normed,alpha=alpha)
plt.grid(True ,axis= 'both', which='major', linestyle = "-.", color = "black", linewidth = 0.5, alpha=.22)
plt.show()