请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  数理科学 帖子:3353364 新帖:10

单因子测试框架

特朗普对头发表于:5 月 10 日 07:04回复(1)

本帖子主要对单因子的有效性进行分析,主要实现功能如下所示:
因子收益率显著性检验
      主要通过T检验分析;根据APT模型,对过去3年(时间长度可修改)的数据进行进行多元线性回归,从而得到需要分析的因子收益率的t值,然后进行以下两个方面的分析:
       t值绝对值序列的均值:之所以要取绝对值,是因为只要t值显著不等于0即可以认为在当期,因子和收 益率存在明显的相关性。但是这种相关性有的时候为正,有的时候为负,如果不取绝对值,则很多正负抵消,会低估因子的有效性;
       t值绝对值序列大于2的比例:检验|t| > 2的比例主要是为了保证|t|平均值的稳定性, 避免出现少数数值特别大的样本值拉高均值。
因子IC分析
      因子k的 IC 值一般是指个股第T期在因子k上的暴露度与T   1期的收益率的相关系数。当得到因子 IC 值序列后,我们可以仿照上一小节t检验的分析方法进行计算:
     IC 值序列的均值及绝对值均值:判断因子有效性;
     IC 值序列的标准差:判断因子稳定性;
     IC 值序列大于零(或小于零)的占比:判断因子效果的一致性。
分层回测
      依照因子值对股票进行打分,构建投资组合回测,是最直观的衡量指标优劣的手段。具体来说,在某个截面期上,可以根据因子值对个股进行打分,将所有个股依照分数进行排序,然后分为N个投资组合,进行回测。
构建方法详细说明如下:
     股票池、回溯区间、截面期(换仓期)可均与动态调整;
     选取一个基准组合(比如沪深300),将所有个股按照得分进行排序,按得分从高到低分成将所有个股按照得分进行排序,按得分从高到低分成N个组合,每个组合中的股票等权配比。
     评价方法:回测年化收益率、年化波动率、夏普比率、最大回撤、胜率等。
     一般来说,对于比较有效的因子(如市净率),分成3~5层进行回测,各个投资组合的最终净值一般可以保序。分成N层(N > 5)进行回测时,可以用最终净值的秩相关系数来衡量 因子的优劣(秩相关系数的绝对值越接近1时效果越好)。
    可调整参数:
    回测区间:开始时间及结束时间
    股票池:HS300、ZZ500、ZZ800、全A股、创业板指、中小板指
    因子:可直接输入API文档中财务中的因子代码
    数据频率:周、月、季
    分层回测调仓周期、分层数等
    其中,回测中代码根据量化课堂修改,实现了参数的动态调节,研究代码为整个单因子测试过程,其中excel文档可通过以下链接下载:

# coding: utf-8# In[ ]:from jqdata import *import pandas as pdfrom pandas import Series, DataFrameimport numpy as npfrom datetime import datetime,timedeltaimport timefrom sklearn import preprocessingfrom scipy.stats import mstatsimport scipy.stats as stimport seaborn as snsimport calendarimport statsmodels.api as smimport matplotlib.pylab as pltimport xlrdimport itertoolsimport copyimport pickleimport warnings# 定义类'参数分析'class parameter_analysis(object):  # 定义函数中不同的变量def __init__(self, algorithm_id=None):self.algorithm_id = algorithm_id            # 回测idself.params_df = pd.DataFrame()             # 回测中所有调参备选值的内容,列名字为对应修改面两名称,对应回测中的 g.XXXXself.results = {}                           # 回测结果的回报率,key 为 params_df 的行序号,value 为self.evaluations = {}                       # 回测结果的各项指标,key 为 params_df 的行序号,value 为一个 dataframeself.backtest_ids = {}                      # 回测结果的 id# 新加入的基准的回测结果 id,可以默认为空 '',则使用回测中设定的基准self.benchmark_id = ''                      
        self.returns = {}                           # 记录所有回报率self.excess_returns = {}                    # 记录超额收益率self.log_returns = {}                       # 记录收益率的 log 值self.log_excess_returns = {}                # 记录超额收益的 log 值self.dates = []                             # 回测对应的所有日期self.excess_max_drawdown = {}               # 计算超额收益的最大回撤self.excess_annual_return = {}              # 计算超额收益率的年化指标self.evaluations_df = pd.DataFrame()        # 记录各项回测指标,除日回报率外   # 定义排队运行多参数回测函数def run_backtest(self,                          # algorithm_id=None,             # 回测策略id running_max=10,                # 回测中同时巡行最大回测数量 start_date='2010-01-01',       # 回测的起始日期 end_date='2017-01-01',         # 回测的结束日期 frequency='day',               # 回测的运行频率 initial_cash='10000000',        # 回测的初始持仓金额 param_names=[],                # 回测中调整参数涉及的变量 param_values=[]                # 回测中每个变量的备选参数值 ):# 当此处回测策略的 id 没有给出时,调用类输入的策略 idif algorithm_id == None: algorithm_id=self.algorithm_id     # 生成所有参数组合并加载到 df 中# 包含了不同参数具体备选值的排列组合中一组参数的 tuple 的 listparam_combinations = list(itertools.product(*param_values))# 生成一个 dataframe, 对应的列为每个调参的变量,每个值为调参对应的备选值to_run_df = pd.DataFrame(param_combinations)# 修改列名称为调参变量的名字to_run_df.columns = param_names      # 设定运行起始时间和保存格式start = time.time()# 记录结束的运行回测finished_backtests = {}# 记录运行中的回测running_backtests = {}# 计数器pointer = 0# 总运行回测数目,等于排列组合中的元素个数total_backtest_num = len(param_combinations)# 记录回测结果的回报率all_results = {}# 记录回测结果的各项指标all_evaluations = {}       # 在运行开始时显示print('【已完成|运行中|待运行】:') # 当运行回测开始后,如果没有全部运行完全的话:while len(finished_backtests)<total_backtest_num:# 显示运行、完成和待运行的回测个数print('[%s|%s|%s].' % (len(finished_backtests), 
                                   len(running_backtests), 
                                   (total_backtest_num-len(finished_backtests)-len(running_backtests)) )),# 记录当前运行中的空位数量to_run = min(running_max-len(running_backtests), total_backtest_num-len(running_backtests)-len(finished_backtests))# 把可用的空位进行跑回测for i in range(pointer, pointer+to_run):# 备选的参数排列组合的 df 中第 i 行变成 dict,每个 key 为列名字,value 为 df 中对应的值params = to_run_df.ix[i].to_dict()# 记录策略回测结果的 id,调整参数 extras 使用 params 的内容backtest = create_backtest(algorithm_id = algorithm_id,   start_date = start_date, 
                                           end_date = end_date, 
                                           frequency = frequency, 
                                           initial_cash = initial_cash, 
                                           extras = params, 
                                           # 再回测中把改参数的结果起一个名字,包含了所有涉及的变量参数值   name = str(params)   )# 记录运行中 i 回测的回测 idrunning_backtests[i] = backtest# 计数器计数运行完的数量    pointer = pointer+to_run    # 获取回测结果failed = []finished = []# 对于运行中的回测,key 为 to_run_df 中所有排列组合中的序数for key in running_backtests.keys():# 研究调用回测的结果,running_backtests[key] 为运行中保存的结果 idbt = get_backtest(running_backtests[key])# 获得运行回测结果的状态,成功和失败都需要运行结束后返回,如果没有返回则运行没有结束status = bt.get_status()# 当运行回测失败if status == 'failed':# 失败 list 中记录对应的回测结果 idfailed.append(key)# 当运行回测成功时elif status == 'done':# 成功 list 记录对应的回测结果 id,finish 仅记录运行成功的finished.append(key)# 回测回报率记录对应回测的回报率 dict, key to_run_df 中所有排列组合中的序数, value 为回报率的 dict# 每个 value 一个 list 每个对象为一个包含时间、日回报率和基准回报率的 dictall_results[key] = bt.get_results()# 回测回报率记录对应回测结果指标 dict, key to_run_df 中所有排列组合中的序数, value 为回测结果指标的 dataframeall_evaluations[key] = bt.get_risk()# 记录运行中回测结果 id 的 list 中删除失败的运行for key in failed:running_backtests.pop(key)# 在结束回测结果 dict 中记录运行成功的回测结果 id,同时在运行中的记录中删除该回测for key in finished:finished_backtests[key] = running_backtests.pop(key)# 当一组同时运行的回测结束时报告时间if len(finished_backtests) != 0 and len(finished_backtests) % running_max == 0 and to_run !=0:# 记录当时时间middle = time.time()# 计算剩余时间,假设没工作量时间相等的话remain_time = (middle - start) * (total_backtest_num - len(finished_backtests)) / len(finished_backtests)# print 当前运行时间print('[已用%s时,尚余%s时,请不要关闭浏览器].' % (str(round((middle - start) / 60.0 / 60.0,3)), 
                                          str(round(remain_time / 60.0 / 60.0,3)))),# 5秒钟后再跑一下time.sleep(5) # 记录结束时间end = time.time() print('')print('【回测完成】总用时:%s秒(即%s小时)。' % (str(int(end-start)), 
                                           str(round((end-start)/60.0/60.0,2)))),# 对应修改类内部对应self.params_df = to_run_dfself.results = all_resultsself.evaluations = all_evaluationsself.backtest_ids = finished_backtests        #7 最大回撤计算方法def find_max_drawdown(self, returns):# 定义最大回撤的变量result = 0# 记录最高的回报率点historical_return = 0# 遍历所有日期for i in range(len(returns)):# 最高回报率记录historical_return = max(historical_return, returns[i])# 最大回撤记录drawdown = 1-(returns[i] + 1) / (historical_return + 1)# 记录最大回撤result = max(drawdown, result)# 返回最大回撤值return result# log 收益、新基准下超额收益和相对与新基准的最大回撤def organize_backtest_results(self, benchmark_id=None):# 若新基准的回测结果 id 没给出if benchmark_id==None:# 使用默认的基准回报率,默认的基准在回测策略中设定self.benchmark_returns = [x['benchmark_returns'] for x in self.results[0]]# 当新基准指标给出后    else:# 基准使用新加入的基准回测结果self.benchmark_returns = [x['returns'] for x in get_backtest(benchmark_id).get_results()]# 回测日期为结果中记录的第一项对应的日期self.dates = [x['time'] for x in self.results[0]]# 对应每个回测在所有备选回测中的顺序 (key),生成新数据# 由 {key:{u'benchmark_returns': 0.022480100091729405,#           u'returns': 0.03184566700000002,#           u'time': u'2006-02-14'}} 格式转化为:# {key: []} 格式,其中 list 为对应 date 的一个回报率 listfor key in self.results.keys():self.returns[key] = [x['returns'] for x in self.results[key]]# 生成对于基准(或新基准)的超额收益率for key in self.results.keys():self.excess_returns[key] = [(x+1)/(y+1)-1 for (x,y) in zip(self.returns[key], self.benchmark_returns)]# 生成 log 形式的收益率for key in self.results.keys():self.log_returns[key] = [log(x+1) for x in self.returns[key]]# 生成超额收益率的 log 形式for key in self.results.keys():self.log_excess_returns[key] = [log(x+1) for x in self.excess_returns[key]]# 生成超额收益率的最大回撤for key in self.results.keys():self.excess_max_drawdown[key] = self.find_max_drawdown(self.excess_returns[key])# 生成年化超额收益率for key in self.results.keys():self.excess_annual_return[key] = (self.excess_returns[key][-1]+1)**(252./float(len(self.dates)))-1# 把调参数据中的参数组合 df 与对应结果的 df 进行合并self.evaluations_df = pd.concat([self.params_df, pd.DataFrame(self.evaluations).T], axis=1)#         self.evaluations_df = # 获取最总分析数据,调用排队回测函数和数据整理的函数    def get_backtest_data(self,  algorithm_id=None,                         # 回测策略id  benchmark_id=None,                         # 新基准回测结果id  file_name='results.pkl',                   # 保存结果的 pickle 文件名字  running_max=10,                            # 最大同时运行回测数量  start_date='2006-01-01',                   # 回测开始时间  end_date='2016-11-30',                     # 回测结束日期  frequency='day',                           # 回测的运行频率  initial_cash='1000000',                    # 回测初始持仓资金  param_names=[],                            # 回测需要测试的变量  param_values=[]                            # 对应每个变量的备选参数  ):# 调运排队回测函数,传递对应参数self.run_backtest(algorithm_id=algorithm_id,  running_max=running_max,  start_date=start_date,  end_date=end_date,  frequency=frequency,  initial_cash=initial_cash,  param_names=param_names,  param_values=param_values  )# 回测结果指标中加入 log 收益率和超额收益率等指标self.organize_backtest_results(benchmark_id)# 生成 dict 保存所有结果。results = {'returns':self.returns,   'excess_returns':self.excess_returns,   'log_returns':self.log_returns,   'log_excess_returns':self.log_excess_returns,   'dates':self.dates,   'benchmark_returns':self.benchmark_returns,   'evaluations':self.evaluations,   'params_df':self.params_df,   'backtest_ids':self.backtest_ids,   'excess_max_drawdown':self.excess_max_drawdown,   'excess_annual_return':self.excess_annual_return,   'evaluations_df':self.evaluations_df}# 保存 pickle 文件pickle_file = open(file_name, 'wb')pickle.dump(results, pickle_file)pickle_file.close()# 读取保存的 pickle 文件,赋予类中的对象名对应的保存内容    def read_backtest_data(self, file_name='results.pkl'):pickle_file = open(file_name, 'rb')results = pickle.load(pickle_file)self.returns = results['returns']self.excess_returns = results['excess_returns']self.log_returns = results['log_returns']self.log_excess_returns = results['log_excess_returns']self.dates = results['dates']self.benchmark_returns = results['benchmark_returns']self.evaluations = results['evaluations']self.params_df = results['params_df']self.backtest_ids = results['backtest_ids']self.excess_max_drawdown = results['excess_max_drawdown']self.excess_annual_return = results['excess_annual_return']self.evaluations_df = results['evaluations_df']# 回报率折线图    def plot_returns(self):# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;fig = plt.figure(figsize=(20,8))ax = fig.add_subplot(110)# 作图for key in self.returns.keys():ax.plot(range(len(self.returns[key])), self.returns[key], label=key)# 设定benchmark曲线并标记ax.plot(range(len(self.benchmark_returns)), self.benchmark_returns, label='benchmark', c='k', linestyle='') ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]plt.xticks(ticks, [self.dates[i] for i in ticks])# 设置图例样式ax.legend(loc = 2, fontsize = 10)# 设置y标签样式ax.set_ylabel('returns',fontsize=20)# 设置x标签样式ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])# 设置图片标题样式ax.set_title("Strategy's performances with different parameters", fontsize=21)plt.xlim(0, len(self.returns[0]))# 超额收益率图    def plot_excess_returns(self):# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;fig = plt.figure(figsize=(20,8))ax = fig.add_subplot(110)# 作图for key in self.returns.keys():ax.plot(range(len(self.excess_returns[key])), self.excess_returns[key], label=key)# 设定benchmark曲线并标记ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='')ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]plt.xticks(ticks, [self.dates[i] for i in ticks])# 设置图例样式ax.legend(loc = 2, fontsize = 10)# 设置y标签样式ax.set_ylabel('excess returns',fontsize=20)# 设置x标签样式ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])# 设置图片标题样式ax.set_title("Strategy's performances with different parameters", fontsize=21)plt.xlim(0, len(self.excess_returns[0]))# log回报率图    def plot_log_returns(self):# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;fig = plt.figure(figsize=(20,8))ax = fig.add_subplot(110)# 作图for key in self.returns.keys():ax.plot(range(len(self.log_returns[key])), self.log_returns[key], label=key)# 设定benchmark曲线并标记ax.plot(range(len(self.benchmark_returns)), [log(x+1) for x in self.benchmark_returns], label='benchmark', c='k', linestyle='')ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]plt.xticks(ticks, [self.dates[i] for i in ticks])# 设置图例样式ax.legend(loc = 2, fontsize = 10)# 设置y标签样式ax.set_ylabel('log returns',fontsize=20)# 设置图片标题样式ax.set_title("Strategy's performances with different parameters", fontsize=21)plt.xlim(0, len(self.log_returns[0]))# 超额收益率的 log 图def plot_log_excess_returns(self):# 通过figsize参数可以指定绘图对象的宽度和高度,单位为英寸;fig = plt.figure(figsize=(20,8))ax = fig.add_subplot(110)# 作图for key in self.returns.keys():ax.plot(range(len(self.log_excess_returns[key])), self.log_excess_returns[key], label=key)# 设定benchmark曲线并标记ax.plot(range(len(self.benchmark_returns)), [0]*len(self.benchmark_returns), label='benchmark', c='k', linestyle='')ticks = [int(x) for x in np.linspace(0, len(self.dates)-1, 11)]plt.xticks(ticks, [self.dates[i] for i in ticks])# 设置图例样式ax.legend(loc = 2, fontsize = 10)# 设置y标签样式ax.set_ylabel('log excess returns',fontsize=20)# 设置图片标题样式ax.set_title("Strategy's performances with different parameters", fontsize=21)plt.xlim(0, len(self.log_excess_returns[0]))# 回测的4个主要指标,包括总回报率、最大回撤夏普率和波动def get_eval4_bar(self, sort_by=[]): 
        sorted_params = self.params_dffor by in sort_by:sorted_params = sorted_params.sort(by)indices = sorted_params.indexfig = plt.figure(figsize=(20,7))# 定义位置ax1 = fig.add_subplot(221)# 设定横轴为对应分位,纵轴为对应指标ax1.bar(range(len(indices)), [self.evaluations[x]['algorithm_return'] for x in indices], 0.6, label = 'Algorithm_return')plt.xticks([x+0.3 for x in range(len(indices))], indices)# 设置图例样式ax1.legend(loc='best',fontsize=15)# 设置y标签样式ax1.set_ylabel('Algorithm_return', fontsize=15)# 设置y标签样式ax1.set_yticklabels([str(x*100)+'% 'for x in ax1.get_yticks()])# 设置图片标题样式ax1.set_title("Strategy's of Algorithm_return performances of different quantile", fontsize=15)# x轴范围plt.xlim(0, len(indices))# 定义位置ax2 = fig.add_subplot(224)# 设定横轴为对应分位,纵轴为对应指标ax2.bar(range(len(indices)), [self.evaluations[x]['max_drawdown'] for x in indices], 0.6, label = 'Max_drawdown')plt.xticks([x+0.3 for x in range(len(indices))], indices)# 设置图例样式ax2.legend(loc='best',fontsize=15)# 设置y标签样式ax2.set_ylabel('Max_drawdown', fontsize=15)# 设置x标签样式ax2.set_yticklabels([str(x*100)+'% 'for x in ax2.get_yticks()])# 设置图片标题样式ax2.set_title("Strategy's of Max_drawdown performances of different quantile", fontsize=15)# x轴范围plt.xlim(0, len(indices))# 定义位置ax3 = fig.add_subplot(223)# 设定横轴为对应分位,纵轴为对应指标ax3.bar(range(len(indices)),[self.evaluations[x]['sharpe'] for x in indices], 0.6, label = 'Sharpe')plt.xticks([x+0.3 for x in range(len(indices))], indices)# 设置图例样式ax3.legend(loc='best',fontsize=15)# 设置y标签样式ax3.set_ylabel('Sharpe', fontsize=15)# 设置x标签样式ax3.set_yticklabels([str(x*100)+'% 'for x in ax3.get_yticks()])# 设置图片标题样式ax3.set_title("Strategy's of Sharpe performances of different quantile", fontsize=15)# x轴范围plt.xlim(0, len(indices))# 定义位置ax4 = fig.add_subplot(222)# 设定横轴为对应分位,纵轴为对应指标ax4.bar(range(len(indices)), [self.evaluations[x]['algorithm_volatility'] for x in indices], 0.6, label = 'Algorithm_volatility')plt.xticks([x+0.3 for x in range(len(indices))], indices)# 设置图例样式ax4.legend(loc='best',fontsize=15)# 设置y标签样式ax4.set_ylabel('Algorithm_volatility', fontsize=15)# 设置x标签样式ax4.set_yticklabels([str(x*100)+'% 'for x in ax4.get_yticks()])# 设置图片标题样式ax4.set_title("Strategy's of Algorithm_volatility performances of different quantile", fontsize=15)# x轴范围plt.xlim(0, len(indices))#14 年化回报和最大回撤,正负双色表示def get_eval(self, sort_by=[]):sorted_params = self.params_dffor by in sort_by:sorted_params = sorted_params.sort(by)indices = sorted_params.index# 大小fig = plt.figure(figsize = (20, 8))# 图1位置ax = fig.add_subplot(110)# 生成图超额收益率的最大回撤ax.bar([x+0.3 for x in range(len(indices))],   [-self.evaluations[x]['max_drawdown'] for x in indices], color = '#32CD32',  
                     width = 0.6, label = 'Max_drawdown', zorder=10)# 图年化超额收益ax.bar([x for x in range(len(indices))],   [self.evaluations[x]['annual_algo_return'] for x in indices], color = 'r', 
                     width = 0.6, label = 'Annual_return')plt.xticks([x+0.3 for x in range(len(indices))], indices)# 设置图例样式ax.legend(loc='best',fontsize=15)# 基准线plt.plot([0, len(indices)], [0, 0], c='k', 
                 linestyle='', label='zero')# 设置图例样式ax.legend(loc='best',fontsize=15)# 设置y标签样式ax.set_ylabel('Max_drawdown', fontsize=15)# 设置x标签样式ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])# 设置图片标题样式ax.set_title("Strategy's performances of different quantile", fontsize=15)#   设定x轴长度plt.xlim(0, len(indices))#14 超额收益的年化回报和最大回撤# 加入新的benchmark后超额收益和def get_excess_eval(self, sort_by=[]):sorted_params = self.params_dffor by in sort_by:sorted_params = sorted_params.sort(by)indices = sorted_params.index# 大小fig = plt.figure(figsize = (20, 8))# 图1位置ax = fig.add_subplot(110)# 生成图超额收益率的最大回撤ax.bar([x+0.3 for x in range(len(indices))],   [-self.excess_max_drawdown[x] for x in indices], color = '#32CD32',  
                     width = 0.6, label = 'Excess_max_drawdown')# 图年化超额收益ax.bar([x for x in range(len(indices))],   [self.excess_annual_return[x] for x in indices], color = 'r', 
                     width = 0.6, label = 'Excess_annual_return')plt.xticks([x+0.3 for x in range(len(indices))], indices)# 设置图例样式ax.legend(loc='best',fontsize=15)# 基准线plt.plot([0, len(indices)], [0, 0], c='k', 
                 linestyle='', label='zero')# 设置图例样式ax.legend(loc='best',fontsize=15)# 设置y标签样式ax.set_ylabel('Max_drawdown', fontsize=15)# 设置x标签样式ax.set_yticklabels([str(x*100)+'% 'for x in ax.get_yticks()])# 设置图片标题样式ax.set_title("Strategy's performances of different quantile", fontsize=15)#   设定x轴长度plt.xlim(0, len(indices))def get_sheetname(factor):data = xlrd.open_workbook('因子数据.xlsx')d_data=[]for i in range(5):d=[]sheet = data.sheet_by_index(i)for j in range(1,sheet.nrows):d.append(sheet.row_values(j)[0])d_data.append(d)for i in range(5):if factor in d_data[i]:return data.sheet_by_index(i).name#获取指定周期的日期列表 'W、M、Q'def get_period_date(peroid,start_date, end_date):#设定转换周期period_type  转换为周是'W',月'M',季度线'Q',五分钟'5min',12天'12D'stock_data = get_price('000001.XSHE',start_date,end_date,'daily',fields=['close'])#记录每个周期中最后一个交易日stock_data['date']=stock_data.index#进行转换,周线的每个变量都等于那一周中最后一个交易日的变量值period_stock_data=stock_data.resample(peroid,how='last')date=period_stock_data.indexpydate_array = date.to_pydatetime()date_only_array = np.vectorize(lambda s: s.strftime('%Y-%m-%d'))(pydate_array )date_only_series = pd.Series(date_only_array)start_date = datetime.strptime(start_date, "%Y-%m-%d")start_date=start_date-timedelta(days=1)start_date = start_date.strftime("%Y-%m-%d")date_list=date_only_series.values.tolist()date_list.insert(0,start_date)return date_list#去除上市距beginDate不足1年且退市在endDate之后的股票def delect_stop(stocks,beginDate,endDate,n=365):stockList=[]beginDate = datetime.strptime(beginDate, "%Y-%m-%d")endDate = datetime.strptime(endDate, "%Y-%m-%d")for stock in stocks:start_date=get_security_info(stock).start_dateend_date=get_security_info(stock).end_dateif start_date<(beginDate-timedelta(days=n)).date() and end_date>endDate.date():stockList.append(stock)return stockList#获取股票池def get_stock(stockPool,begin_date,end_date):if stockPool=='HS300':stockList=get_index_stocks('000300.XSHG',begin_date)elif stockPool=='ZZ500':stockList=get_index_stocks('399905.XSHE',begin_date)elif stockPool=='ZZ800':stockList=get_index_stocks('399906.XSHE',begin_date)   elif stockPool=='CYBZ':stockList=get_index_stocks('399006.XSHE',begin_date)elif stockPool=='ZXBZ':stockList=get_index_stocks('399005.XSHE',begin_date)elif stockPool=='A':stockList=get_index_stocks('000002.XSHG',begin_date)+get_index_stocks('399107.XSHE',begin_date)#剔除ST股st_data=get_extras('is_st',stockList, count = 1,end_date=begin_date)stockList = [stock for stock in stockList if not st_data[stock][0]]#剔除停牌、新股及退市股票stockList=delect_stop(stockList,begin_date,end_date)return stockListdef factor_t_test(factor,begin_date,end_date,stockPool,peroid):warnings.filterwarnings("ignore")date_period=get_period_date(peroid,begin_date,end_date)#获取行业数据industry_code=['HY001','HY002','HY003','HY004','HY005','HY006','HY007','HY008','HY009','HY010','HY011']valueFactor=['pe_ratio','pb_ratio','ps_ratio','pcf_ratio']#取股票池stockList=get_stock(stockPool,begin_date,end_date)#获取因子str_factor=get_sheetname(factor)+'.'+factorstr_factor=eval(str_factor)  WLS_params = {}WLS_t_test = {}for date in date_period[:-1]:#获取横截面收益率df_close=get_price(stockList, date, date_period[date_period.index(date)+1], 'daily', ['close'])if df_close.empty:continuedf_pchg=df_close['close'].iloc[-1,:]/df_close['close'].iloc[0,:]-1R_T = get_fundamentals(query(valuation.code,valuation.circulating_market_cap).filter(valuation.code.in_(stockList)), date)R_T.set_index('code',inplace=True, drop=False)R_T['Weight']=np.sqrt(R_T.circulating_market_cap)# HS300指数收益率index_close=get_price('000300.XSHG', date, date_period[date_period.index(date)+1], 'daily', ['close'])index_pchg=index_close['close'].iloc[-1]/index_close['close'].iloc[0]-1R_T['pchg']=df_pchg-index_pchg#获取因子数据factor_fata = get_fundamentals(query(valuation.code,str_factor).filter(valuation.code.in_(stockList)), date)if factor in valueFactor:factor_fata[factor]=1/factor_fata[factor]#中位数处理极值factor_fata[factor] = mstats.winsorize(factor_fata[factor], limits=0.025) #取置信区间95% #数据标准化factor_fata[factor] = preprocessing.scale(factor_fata[factor])factor_fata.set_index('code',inplace=True, drop=False)#获取行业暴露度、哑变量矩阵Linear_Regression = pd.DataFrame()for i in industry_code:i_Constituent_Stocks = get_industry_stocks(i, date)i_Constituent_Stocks = list(set(i_Constituent_Stocks).intersection(set(stockList)))try:tmp = factor_fata.loc[i_Constituent_Stocks]tmp.dropna(inplace=True)tmp[i]=1.0except:print(i)Linear_Regression = Linear_Regression.append(tmp)Linear_Regression.fillna(0.0, inplace=True)Linear_Regression = pd.merge(Linear_Regression, R_T.loc[:,['code','pchg','Weight']], on='code')Linear_Regression=Linear_Regression.dropna()X = Linear_Regression.loc[:,industry_code+[factor]]y = Linear_Regression['pchg']   # WLS回归wls = sm.WLS(y, X, weights=Linear_Regression.Weight)result = wls.fit()WLS_params[date] = result.params[-1]WLS_t_test[date] = result.tvalues[-1]    t_test = pd.Series(WLS_t_test)print 't值序列绝对值平均值',np.sum(np.abs(t_test.values))/len(t_test)n = [x for x in t_test.values if np.abs(x)>2]print 't值序列绝对值大于2的占比——判断因子的显著性是否稳定',len(n)/float(len(t_test))print 't值序列均值的绝对值除以t值序列的标准差',np.abs(t_test.mean())/t_test.std()t_test.plot('bar',figsize=(16,5))return t_test#因子IC分析def factor_IC_analysis(factor,begin_date,end_date,stockPool,peroid,rule='normal'):  warnings.filterwarnings("ignore")date_period=get_period_date(peroid,begin_date,end_date)#获取行业数据industry_code=['HY001','HY002','HY003','HY004','HY005','HY006','HY007','HY008','HY009','HY010','HY011']valueFactor=['pe_ratio','pb_ratio','ps_ratio','pcf_ratio']#取股票池stockList=get_stock(stockPool,begin_date,end_date)#获取因子str_factor=get_sheetname(factor)+'.'+factorstr_factor=eval(str_factor)  IC = {}R_T = pd.DataFrame()for date in date_period[:-1]:#获取横截面收益率df_close=get_price(stockList, date, date_period[date_period.index(date)+1], 'daily', ['close'])if df_close.empty:continuedf_pchg=df_close['close'].iloc[-1,:]/df_close['close'].iloc[0,:]-1R_T['pchg']=df_pchg#获取因子数据factor_data = get_fundamentals(query(valuation.code,str_factor).filter(valuation.code.in_(stockList)), date)if factor in valueFactor:factor_data[factor]=1/factor_data[factor]#中位数处理极值factor_data[factor] = mstats.winsorize(factor_data[factor], limits=0.025) #取置信区间95% #数据标准化factor_data[factor] = preprocessing.scale(factor_data[factor])factor_data.set_index('code',inplace=True, drop=False)R_T[factor]=factor_data[factor]#st.pearsonr(data.periodReturn.rank(), data[factor].rank()) if rank else st.pearsonr(data.periodReturn, data[factor])if rule=='normal':IC[date]=st.pearsonr(R_T.pchg, R_T[factor])[0]elif rule=='rank':IC[date]=st.pearsonr(R_T.pchg.rank(), R_T[factor].rank())[0]IC = pd.Series(IC)print 'IC 值序列的均值大小',IC.mean(),'IC值序列绝对值的均值大小',np.mean(np.abs(IC))print 'IC 值序列的标准差',IC.std()print 'IR 比率(IC值序列均值与标准差的比值)',IC.mean()/IC.std()n = [x for x in IC.values if x>0]print 'IC 值序列大于零的占比',len(n)/float(len(IC))IC.plot('bar',figsize=(16,5))return ICdef group_backtest(factor,start_date,end_date,index,peroid,Group):warnings.filterwarnings("ignore")quantile=tuple(zip(range(0,100,100/Group), range(100/Group,101,100/Group)))    precent=1/float(Group)pa = parameter_analysis()pa.get_backtest_data(file_name = 'results.pkl',  running_max = 10,  algorithm_id = '8e13aae165ab5b091545d0277d5f2b6d',  start_date=start_date,  end_date=end_date,  frequency = 'day',  initial_cash = '10000000',  param_names = ['factor', 'quantile','precent','index','peroid'],  param_values = [[factor], quantile,[precent],[index],[peroid]]                     
                          )pa.read_backtest_data('results.pkl')pa.evaluations_dfpa.plot_returns()pa.plot_excess_returns()pa.get_eval4_bar()pa.get_eval()pa.get_excess_eval()
factor='pb_ratio'begin_date='2015-01-01'end_date='2018-01-01'stockPool='HS300'period='M'stockPeriod=20Group=5
t_test=factor_t_test(factor,begin_date,end_date,stockPool,period)
t值序列绝对值平均值 2.55596522701
t值序列绝对值大于2的占比——判断因子的显著性是否稳定 0.555555555556
t值序列均值的绝对值除以t值序列的标准差 0.251798763238
IC=factor_IC_analysis(factor,begin_date,end_date,stockPool,period)
IC 值序列的均值大小 0.0187058171986 IC值序列绝对值的均值大小 0.166314988774
IC 值序列的标准差 0.205703957899
IR 比率(IC值序列均值与标准差的比值) 0.0909356212185
IC 值序列大于零的占比 0.583333333333
group_backtest(factor,begin_date,end_date,stockPool,stockPeriod,Group)
【已完成|运行中|待运行】:
[0|0|5]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. [0|5|0]. 
【回测完成】总用时:86秒(即0.02小时)。

全部回复

0/140

量化课程

    移动端课程