繁簡切換您正在訪問的是FX168財經網,本網站所提供的內容及信息均遵守中華人民共和國香港特別行政區當地法律法規。

FX168财经网>人物频道>帖子

kdj指标配合accer过滤

作者/1xxxxxxxx 2019-05-10 08:30 0 来源: FX168财经网人物频道

4%的止损,3%的移动止损,小盘股的表现还可以,涨停板的卖出策略还得改,牛股容易卖飞

下面这部分是用来实时计算指标的,tick回测中使用handle_tick,分钟回测中使用handle_data,大家可以把它复制保存talib_real.py到研究目录下就可以在回测中调用。指标测试结果我都跟通达信的对比过,基本上是吻合的,大家如果在使用中发现bug欢迎指正

# coding:utf-8
"""
@author: leonardo
@created time: 2018-09-28
@last modified time:2018-10-10
"""
import abc, six, math
from jqlib.technical_analysis import *
from jqdata import *


@six.add_metaclass(abc.ABCMeta)
class IQuant():
    """
    基于聚宽分钟(及tick)回测框架计算日线的实时指标接口,在分钟回测中使用self.handle_data(),在tick回测中使用self.handle_tick()两者只能二选一
    """

    @abc.abstractmethod
    def before_market_open(self, context):
        """
        在开盘之前执行
        :param context:
        :return:
        """
        pass

    @abc.abstractmethod
    def after_market_close(self, context):
        """
        在收盘之后执行
        ps:在此函数中需要执行一遍self._eval()是因为handle_data只执行到14:59就结束,收盘的数据15:00需要再次计算
        :param context:
        :return:
        """
        pass

    @abc.abstractmethod
    def handle_data(self, context, data):
        """
        在盘中(handle_data)执行
        :param context:
        :param data:
        :return:
        """
        pass

    @abc.abstractmethod
    def handle_tick(self, context, tick):
        """
        在盘中(handle_tick)执行
        :param context:
        :param tick:
        :return:
        """
        pass

    @abc.abstractmethod
    def value(self):
        """
        返回指标
        :return:
        """
        pass

    @abc.abstractmethod
    def check(self):
        """
        决定是否买卖
        {negative:sell;0:none;positive:buy}
        :return:
        """
        pass

    @abc.abstractmethod
    def to_string(self):
        """
        用于打印指标(方便测试)
        :return:
        """
        pass


class KD_Real(IQuant):
    """
    RSV:=(CLOSE-LLV(LOW,N))/(HHV(HIGH,N)-LLV(LOW,N))*100;
    K:SMA(RSV,M1,1);
    D:SMA(K,M2,1);
    """

    def __init__(self, security, N=5, M1=3, M2=3):
        self._value = {}
        self._security = security
        self._N = N
        self._M1 = M1
        self._M2 = M2

        self._value['K'] = None
        self._value['D'] = None

        self._rate = 1 + 0.015 * N  # 假定在过去(N*2-1)个交易日最高最低价超过一定比率

    def before_market_open(self, context):
        if self._value['K'] is None:  # first init
            K_, D_ = KD([self._security], check_date=context.previous_date.strftime('%Y-%m-%d'), N=self._N, M1=self._M1,
                        M2=self._M2)
            self._value['K_'] = K_[self._security]
            self._value['D_'] = D_[self._security]
        else:  # update today data as previous data
            self._value['K_'] = self._value['K']
            self._value['D_'] = self._value['D']
        self._previous_date = context.previous_date.strftime('%Y-%m-%d')
        # 由于存在复权,hhv和llv可能会有所改变,所以需要每天重新初始化
        bars = get_price(self._security, end_date=self._previous_date, frequency='1d',
                         fields=['high', 'low', 'close'], skip_paused=True, count=self._N - 1)
        self._value['hhv'] = max(bars['high'])
        self._value['llv'] = min(bars['low'])

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close, close, close)

    def handle_data(self, context, data):
        self._eval(high=data[self._security].high, low=data[self._security].low, close=data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current, tick.current, tick.current)

    def _eval(self, high, low, close):
        self._current_price = close
        if high > self._value['hhv']:
            self._value['hhv'] = high
        elif low < self._value['llv']:
            self._value['llv'] = low
        rsv = 100.0 * (close - self._value['llv']) / (self._value['hhv'] - self._value['llv'])
        self._value['K'] = (rsv + (self._M1 - 1) * self._value['K_']) / self._M1
        self._value['D'] = (self._value['K'] + (self._M2 - 1) * self._value['D_']) / self._M2

    def value(self):
        return self._value

    def check(self):
        if self._value['K_'] < self._value['D_']:
            if self._value['K'] > self._value['D']:  # 金叉
                return 1
                # bars = get_price(self._security, end_date=self._previous_date, frequency='1d',
                #                  fields=['high', 'low'], skip_paused=True, count=(self._N * 2 - 1))
                # hhv = max(bars['high'])
                # llv = min(bars['low'])
                # if hhv / llv >= self._rate and self._current_price < (hhv + llv) / 2.0:
                #     return 1
        elif self._value['K_'] > self._value['D_']:
            if self._value['K'] < self._value['D']:  # 死叉
                return -1
        return 0

    def to_string(self):
        return '{K:' + str(self._value['K']) + '    D:' + str(self._value['D']) + '}'


class SKDJ_Real(IQuant):
    """
    LOWV:=LLV(LOW,N);
    HIGHV:=HHV(HIGH,N);
    RSV:=EMA((CLOSE-LOWV)/(HIGHV-LOWV)*100,M);#区别于KD指标,该指标更平滑
    K:EMA(RSV,M);
    D:MA(K,M);
    """

    def __init__(self, security, N=5, M=3):
        self._value = {}
        self._security = security
        self._N = N
        self._M = M

        self._value['K'] = None
        self._value['D'] = None

        alpha = 2.0 / (M + 1)
        self._need_N = int(math.log(0.00001 / alpha, 1 - alpha))

    def before_market_open(self, context):
        self._previous_date = context.previous_date.strftime('%Y-%m-%d')
        if self._value['K'] is None:  # first init
            trade_days = get_trade_days(end_date=context.previous_date, count=self._M - 1)
            self._K_list = []
            for day in trade_days:
                K_, D_ = SKDJ([self._security], check_date=day.strftime('%Y-%m-%d'), N=self._N, M=self._M)
                self._K_list.append(K_[self._security])
            self._value['K_'] = K_[self._security]
            self._value['D_'] = D_[self._security]

            self._rsv_list = []
            tmp_bars = get_price(self._security, end_date=self._previous_date, frequency='1d',
                                 fields=['high', 'low', 'close'], skip_paused=True,
                                 count=self._need_N + self._M + self._N - 2)
            highs = tmp_bars['high']
            lows = tmp_bars['low']
            closes = tmp_bars['close']
            tmp_rsv_list = []
            for i in range(self._need_N + self._M - 1):
                lowv = min(lows[i:i + self._N])
                highv = max(highs[i:i + self._N])
                tmp_rsv_list.append(100 * (closes[i + self._N - 1] - lowv) / (highv - lowv))
            for i in range(self._M - 1):
                self._rsv_list.append(sma_cn(tmp_rsv_list[i:i + self._need_N + 1], n=self._M + 1, m=2))

        else:  # update today data as previous data
            self._value['K_'] = self._value['K']
            self._value['D_'] = self._value['D']

        # 由于存在复权,hhv和llv可能会有所改变,所以需要每天重新初始化
        bars = get_price(self._security, end_date=self._previous_date, frequency='1d',
                         fields=['high', 'low'], skip_paused=True, count=self._N - 1)
        self._value['hhv'] = max(bars['high'])
        self._value['llv'] = min(bars['low'])
        self._sum_k = sum(self._K_list)

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close, close, close)

        self._rsv_list.pop(0)
        self._rsv_list.append(self._rsv)
        self._K_list.pop(0)
        self._K_list.append(self._value['K'])

    def handle_data(self, context, data):
        self._eval(high=data[self._security].high, low=data[self._security].low, close=data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current, tick.current, tick.current)

    def _eval(self, high, low, close):
        self._current_price = close
        if high > self._value['hhv']:
            self._value['hhv'] = high
        elif low < self._value['llv']:
            self._value['llv'] = low
        self._rsv = (self._rsv_list[-1] * (self._M - 1) + 200.0 * (close - self._value['llv']) / (
                self._value['hhv'] - self._value['llv'])) / (self._M + 1)

        self._value['K'] = (self._rsv * 2 + (self._M - 1) * self._value['K_']) / (self._M + 1)
        self._value['D'] = (self._sum_k + self._value['K']) / self._M

    def value(self):
        return self._value

    def check(self):
        if self._value['K_'] < self._value['D_']:
            if self._value['K'] > self._value['D']:  # 金叉
                return 1
        elif self._value['K_'] > self._value['D_']:
            if self._value['K'] < self._value['D']:  # 死叉
                return -1
        return 0

    def to_string(self):
        return '{K:' + str(self._value['K']) + '    D:' + str(self._value['D']) + '}'


class MACD_Real(IQuant):
    """
    DIF:EMA(CLOSE,SHORT)-EMA(CLOSE,LONG);
    DEA:EMA(DIF,MID);
    MACD:(DIF-DEA)*2
    """

    def __init__(self, security, short=12, long=26, mid=9):
        self._value = {}

        self._security = security
        self._short = short
        self._long = long
        self._mid = mid

        self._value['dif'] = None
        self._value['dea'] = None
        self._value['macd'] = None

    def before_market_open(self, context):

        if self._value['dif'] is None:
            self._value['short_ema_'] = \
                EMA([self._security], check_date=context.previous_date.strftime('%Y-%m-%d'), timeperiod=self._short)[
                    self._security]
            self._value['long_ema_'] = EMA([self._security], check_date=context.previous_date.strftime('%Y-%m-%d'),
                                           timeperiod=self._long)[self._security]
            dif, dea, macd = MACD([self._security], check_date=context.previous_date.strftime('%Y-%m-%d'),
                                  SHORT=self._short, LONG=self._long, MID=self._mid)
            self._value['dif_'] = dif[self._security]
            self._value['dea_'] = dea[self._security]
        else:  # 以下四个参数与close是线性关系,所以复权后也不会改变,可以直接使用
            self._value['short_ema_'] = self._value['short_ema']
            self._value['long_ema_'] = self._value['long_ema']
            self._value['dif_'] = self._value['dif']
            self._value['dea_'] = self._value['dea']

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close)

    def handle_data(self, context, data):
        self._eval(data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current)

    def _eval(self, close):
        self._value['short_ema'] = (2 * close + (self._short - 1) * self._value['short_ema_']) / (
                self._short + 1.0)
        self._value['long_ema'] = (2 * close + (self._long - 1) * self._value['long_ema_']) / (
                self._long + 1.0)
        self._value['dif'] = self._value['short_ema'] - self._value['long_ema']
        self._value['dea'] = (2 * self._value['dif'] + (self._mid - 1) * self._value['dea_']) / (self._mid + 1.0)
        self._value['macd'] = 2 * (self._value['dif'] - self._value['dea'])

    def value(self):
        return self._value

    def check(self):
        if self._value['dif_'] < self._value['dea_']:
            if self._value['dif'] > self._value['dea']:  # 金叉
                return 1
        elif self._value['dif_'] > self._value['dea_']:
            if self._value['dif'] < self._value['dea']:  # 死叉
                return -1
        return 0

    def to_string(self):
        return '{dif:' + str(self._value['dif']) + '    dea:' + str(self._value['dea']) + '    macd:' + str(
            self._value['macd']) + '}'


class RSI_Real(IQuant):
    """
    LC:=REF(CLOSE,1);
    RSI1:SMA(MAX(CLOSE-LC,0),N1,1)/SMA(ABS(CLOSE-LC),N1,1)*100;
    RSI2:SMA(MAX(CLOSE-LC,0),N2,1)/SMA(ABS(CLOSE-LC),N2,1)*100;
    RSI3:SMA(MAX(CLOSE-LC,0),N3,1)/SMA(ABS(CLOSE-LC),N3,1)*100;
    """

    def __init__(self, security, N1=6, N2=12, N3=24):
        self._value = {}

        self._security = security
        self._N1 = N1
        self._N2 = N2
        self._N3 = N3

        self._max_N = self.__calculate_max_n(max(N1, N2, N3))
        self._need_N1 = self.__calculate_max_n(N1)
        self._need_N2 = self.__calculate_max_n(N2)
        self._need_N3 = self.__calculate_max_n(N3)

        self._value['rsi1'] = None

    def before_market_open(self, context):
        if self._value['rsi1'] is None:
            previous_date = context.previous_date.strftime('%Y-%m-%d')
            tmp = RSI([self._security], check_date=previous_date, N1=self._N1)
            self._value['rsi1_'] = tmp[self._security]
            tmp = RSI([self._security], check_date=previous_date, N1=self._N2)
            self._value['rsi2_'] = tmp[self._security]
            tmp = RSI([self._security], check_date=previous_date, N1=self._N3)
            self._value['rsi3_'] = tmp[self._security]
        else:
            self._value['rsi1_'] = self._value['rsi1']
            self._value['rsi2_'] = self._value['rsi2']
            self._value['rsi3_'] = self._value['rsi3']
        # 可能存在复权,get_price的数据不能用来缓存计算(也可以通过上一个收盘价和当天取到的收盘价比较是否存在复权,这里为了简化代码(偷懒)直接重新拉取数据)
        self._X = \
            get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                      fields=['close'], skip_paused=True, count=self._max_N)['close']
        self._lc = self._X[-1]
        self._max_X, self._abs_X = self.__cumminus(self._X)
        # 今日值的占位符
        self._max_X.append(0)
        self._abs_X.append(0)

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close)

    def handle_data(self, context, data):
        self._eval(data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current)

    def _eval(self, close):
        dx = close - self._lc
        if dx > 0:
            self._abs_X[-1] = dx
            self._max_X[-1] = dx
        else:
            self._abs_X[-1] = -dx
            self._max_X[-1] = 0

        max_sma = sma_cn(X=self._max_X[-self._need_N1:], n=self._N1, m=1)
        abs_sma = sma_cn(X=self._abs_X[-self._need_N1:], n=self._N1, m=1)
        self._value['rsi1'] = 100.0 * max_sma / abs_sma

        max_sma = sma_cn(X=self._max_X[-self._need_N2:], n=self._N2, m=1)
        abs_sma = sma_cn(X=self._abs_X[-self._need_N2:], n=self._N2, m=1)
        self._value['rsi2'] = 100.0 * max_sma / abs_sma

        max_sma = sma_cn(X=self._max_X[-self._need_N3:], n=self._N3, m=1)
        abs_sma = sma_cn(X=self._abs_X[-self._need_N3:], n=self._N3, m=1)
        self._value['rsi3'] = 100.0 * max_sma / abs_sma

    def value(self):
        return self._value

    def check(self):
        if self._value['rsi1_'] < self._value['rsi2_']:
            if self._value['rsi1'] > self._value['rsi2']:  # 金叉
                return 1
        elif self._value['rsi1_'] > self._value['rsi2_']:
            if self._value['rsi1'] < self._value['rsi2']:  # 死叉
                return -1
        return 0

    def __cumminus(self, X):
        rs_max = []
        rs_abs = []
        for i in range(len(X) - 1):
            dx = X[i + 1] - X[i]
            if dx > 0:
                rs_abs.append(dx)
                rs_max.append(dx)
            else:
                rs_abs.append(-dx)
                rs_max.append(0)
        return rs_max, rs_abs

    def __calculate_max_n(self, n, theta=0.00001):
        """
        计算需要迭代的次数,保证最后的叠加量不超过theta
        :param n:
        :param theta:
        :return:
        """
        alpha = 1.0 / n
        beta = 1 - alpha
        return int(math.log(theta / alpha, beta))

    def to_string(self):
        return '{rsi1:' + str(self._value['rsi1']) + '    rsi2:' + str(self._value['rsi2']) + '    rsi3:' + str(
            self._value['rsi3']) + '}'


class ROC_Real(IQuant):
    """
    ROC:100*(CLOSE-REF(CLOSE,N))/REF(CLOSE,N);
    MAROC:MA(ROC,M);
    """

    def __init__(self, security, N=12, M=6):
        self._value = {}

        self._security = security
        self._N = N
        self._M = M

        self._roc_list = None

    def before_market_open(self, context):
        if self._roc_list is None:
            bars = get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                             fields=['close'], skip_paused=True, count=(self._N + self._M - 1))['close']
            self._lc = bars[-self._N]
            self._roc_list = map(lambda x, y: 100 * (y / x - 1), bars[:self._M - 1], bars[self._N:])
        else:
            self._lc = get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                                 fields=['close'], skip_paused=True, count=self._N)['close'][0]
        self._sum_roc = sum(self._roc_list)

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close)
        self._roc_list.pop(0)
        self._roc_list.append(self._value['roc'])

    def handle_data(self, context, data):
        self._eval(data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current)

    def _eval(self, close):
        self._value['roc'] = 100 * (close / self._lc - 1)
        self._value['maroc'] = (self._sum_roc + self._value['roc']) / self._M

    def value(self):
        return self._value

    def check(self):
        # TODO
        return 0

    def to_string(self):
        return '{roc:' + str(self._value['roc']) + '   maroc:' + str(self._value['maroc']) + '}'


class MA_Real(IQuant):
    """
    收盘价均线
    """

    def __init__(self, security, M1=5, M2=10, M3=20, M4=60):
        self._value = {}

        self._security = security
        self._M1 = M1
        self._M2 = M2
        self._M3 = M3
        self._M4 = M4

        self._max_M = max(M1, M2, M3, M4)

        self._value['ma1'] = None

    def before_market_open(self, context):
        if self._value['ma1'] is None:
            self._list = list(
                get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=self._max_M - 1)['close'])
        self._sum1 = sum(self._list[-self._M1 + 1:])
        self._sum2 = sum(self._list[-self._M2 + 1:])
        self._sum3 = sum(self._list[-self._M3 + 1:])
        self._sum4 = sum(self._list[-self._M4 + 1:])

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close)

        self._list.pop(0)
        self._list.append(close)

    def handle_data(self, context, data):
        self._eval(data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current)

    def _eval(self, close):
        self._value['ma1'] = (self._sum1 + close) / self._M1
        self._value['ma2'] = (self._sum2 + close) / self._M2
        self._value['ma3'] = (self._sum3 + close) / self._M3
        self._value['ma4'] = (self._sum4 + close) / self._M4

    def value(self):
        return self._value

    def check(self):
        # TODO
        return 0

    def to_string(self):
        return '{ma1:' + str(self._value['ma1']) + '    ma2:' + str(self._value['ma2']) + '    ma3:' + str(
            self._value['ma3']) + '    ma4:' + str(self._value['ma4']) + '}'


class VMA_Real(IQuant):
    """
    自定义权重均线
    VV:=(HIGH * K1 + OPEN * K2 + LOW * K3 + CLOSE * K4)/(K1+K2+K3+K4);
    VMA1:MA(VV,M1);
    VMA2:MA(VV,M2);
    """

    def __init__(self, security, M1=5, M2=10, M_HIGH=1, M_LOW=1, M_OPEN=2, M_CLOSE=2):
        self._value = {}

        self._security = security
        self._M1 = M1
        self._M2 = M2
        self._M_HIGH = M_HIGH
        self._M_LOW = M_LOW
        self._M_OPEN = M_OPEN
        self._M_CLOSE = M_CLOSE

        self._max_M = max(M1, M2)
        self._sum_M = M_OPEN + M_CLOSE + M_HIGH + M_LOW

        self._value['ma1'] = None

    def before_market_open(self, context):
        if self._value['ma1'] is None:
            bars = get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                             fields=['high', 'low', 'open', 'close'], skip_paused=True, count=self._max_M)
            vv_list = (bars['high'] * self._M_HIGH + bars['low'] * self._M_LOW + bars['open'] * self._M_OPEN + bars[
                'close'] * self._M_CLOSE) / self._sum_M
            self._value['ma1_'] = vv_list[-self._M1:].sum()
            self._value['ma2_'] = vv_list[-self._M2:].sum()
            self._vv_list = list(vv_list[1:])

        else:
            self._value['ma1_'] = self._value['ma1']
            self._value['ma2_'] = self._value['ma2']

        self._sum1 = sum(self._vv_list[-self._M1 + 1:])
        self._sum2 = sum(self._vv_list[-self._M2 + 1:])

        self._open = None
        self._high = 0
        self._low = 1000000

    def after_market_close(self, context):
        today = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close', 'open', 'high', 'low'], skip_paused=True, count=1).iloc[0]
        vv = (
                     today.high * self._M_HIGH + today.low * self._M_LOW + today.open * self._M_OPEN + today.close * self._M_CLOSE) / self._sum_M
        self._eval(today.high, today.low, today.close)

        self._vv_list.pop(0)
        self._vv_list.append(vv)

    def handle_data(self, context, data):
        if self._open is None:
            self._open = data[self._security].open

        self._eval(data[self._security].high, data[self._security].low, data[self._security].close)

    def handle_tick(self, context, tick):
        if self._open is None:
            self._open = tick.current
        self._eval(tick.current, tick.current, tick.current)

    def _eval(self, high, low, close):
        if self._high < high:
            self._high = high
        if self._low > low:
            self._low = low
        vv = (
                     self._high * self._M_HIGH + self._low * self._M_LOW + self._open * self._M_OPEN + close * self._M_CLOSE) / self._sum_M
        self._value['ma1'] = (self._sum1 + vv) / self._M1
        self._value['ma2'] = (self._sum2 + vv) / self._M2

    def value(self):
        return self._value

    def check(self):
        """
        五日线上穿十日线买进
        :return:
        """
        if self._value['ma1_'] < self._value['ma2_']:
            if self._value['ma1'] > self._value['ma2']:
                return 1
        elif self._value['ma1_'] > self._value['ma2_']:
            if self._value['ma1'] < self._value['ma2']:
                return -1
        return 0

    def to_string(self):
        return '{ma1:' + str(self._value['ma1']) + '    ma2:' + str(self._value['ma2']) + '}'


class CCI_Real(IQuant):
    """
      TYP:=(HIGH+LOW+CLOSE)/3;
      CCI:(TYP-MA(TYP,N))/(0.015*AVEDEV(TYP,N));
      其中AVEDEV=AVG(abs(MA-TYP))
    """

    def __init__(self, security, N=14):
        self._value = {}

        self._security = security
        self._N = N

        self._value['cci'] = None

    def before_market_open(self, context):
        if self._value['cci'] is None:
            bars = get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                             fields=['close', 'high', 'low'], skip_paused=True, count=self._N - 1)
            self._typ_list = ((bars.close + bars.high + bars.low) / 3.0).tolist()

        self._typ_list.append(0)  # 占位符
        self._sum_typ = sum(self._typ_list)

        self._high = 0
        self._low = 1000000

    def after_market_close(self, context):
        today_bar = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                              fields=['close', 'high', 'low'], skip_paused=True, count=1).iloc[0]
        typ = (today_bar.close + today_bar.high + today_bar.low) / 3
        self._eval(typ)

        self._typ_list.pop(0)

    def handle_data(self, context, data):
        if self._high < data[self._security].high:
            self._high = data[self._security].high
        if self._low > data[self._security].low:
            self._low = data[self._security].low
        typ = (data[self._security].close + self._high + self._low) / 3
        self._eval(typ)

    def handle_tick(self, context, tick):
        if self._high < tick.current:
            self._high = tick.current
        if self._low > tick.current:
            self._low = tick.current
        typ = (tick.current + self._high + self._low) / 3
        self._eval(typ)

    def _eval(self, typ):
        self._typ_list[-1] = typ
        ma_typ = (self._sum_typ + typ) / self._N
        adeved = sum(map(lambda x: abs(x - ma_typ), self._typ_list)) / self._N
        self._value['cci'] = (typ - ma_typ) / (0.015 * adeved)

    def value(self):
        return self._value

    def check(self):
        """
        TODO
        1.CCI 为正值时,视为多头市场;为负值时,视为空头市场;
        2.常态行情时,CCI 波动于±100 的间;强势行情,CCI 会超出±100 ;
        3.CCI>100 时,买进,直到CCI<100 时,卖出;
        4.CCI<-100 时,放空,直到CCI>-100 时,回补。
        :return:
        """
        return 0

    def to_string(self):
        return '{cci:' + str(self._value['cci']) + '}'


class ADTM_Real(IQuant):
    """
    DTM:=IF(OPEN<=REF(OPEN,1),0,MAX((HIGH-OPEN),(OPEN-REF(OPEN,1))));
    DBM:=IF(OPEN>=REF(OPEN,1),0,MAX((OPEN-LOW),(OPEN-REF(OPEN,1))));#后面部分(OPEN-REF(OPEN,1)明显小于0,不知道通达信写这部分意义何在?
    STM:=SUM(DTM,N);
    SBM:=SUM(DBM,N);
    ADTM:IF(STM>SBM,(STM-SBM)/STM,IF(STM=SBM,0,(STM-SBM)/SBM));
    MAADTM:MA(ADTM,M);
    """

    def __init__(self, security, N=23, M=8):
        self._value = {}

        self._security = security
        self._N = N
        self._M = M

        self._adtm_list = None

    def before_market_open(self, context):
        if self._adtm_list is None:
            bars = get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                             fields=['open', 'high', 'low'], skip_paused=True, count=self._N)
            self._open_list = list(bars['open'])
            self._high_list = list(bars['high'])
            self._low_list = list(bars['low'])
            self._stm_list, self._sbm_list = self.__stm_sbm()
            trade_days = get_trade_days(end_date=context.previous_date.strftime('%Y-%m-%d'), count=self._M - 1)
            self._adtm_list = []
            for trade_day in trade_days:
                ADTM_, MAADTM = ADTM(self._security, check_date=trade_day.strftime('%Y-%m-%d'), N=self._N, M=self._M)
                self._adtm_list.append(ADTM_[self._security])
            self._value['adtm_'] = ADTM_[self._security]
            self._value['maadtm_'] = MAADTM[self._security]
        else:
            self._value['adtm_'] = self._value['adtm']
            self._value['maadtm_'] = self._value['maadtm']
        self._STM = sum(self._stm_list)
        self._SBM = sum(self._sbm_list)
        self._sum_adtm = sum(self._adtm_list)

        self._open = None
        self._high = 0
        self._low = 1000000  # 假定股价没有超过1000000

    def after_market_close(self, context):
        today_bar = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                              fields=['open', 'high', 'low'], skip_paused=True, count=1).iloc[0]
        self._eval(today_bar.high, today_bar.low, today_bar.open)

        self._stm_list.pop(0)
        self._stm_list.append(self._stm)
        self._sbm_list.pop(0)
        self._sbm_list.append(self._sbm)
        self._adtm_list.pop(0)
        self._adtm_list.append(self._adtm)
        self._high_list.pop(0)
        self._high_list.append(today_bar.high)
        self._low_list.pop(0)
        self._low_list.append(today_bar.low)
        self._open_list.pop(0)
        self._open_list.append(today_bar.open)

    def handle_data(self, context, data):
        self._eval(data[self._security].high, data[self._security].low, data[self._security].open)

    def handle_tick(self, context, tick):
        self._eval(tick.current, tick.current, tick.current)

    def _eval(self, high, low, open):
        if self._open is None:
            self._open = open
            self._dx = self._open - self._open_list[-1]
        if self._high < high:
            self._high = high
        if self._low > low:
            self._low = low

        # dx = open - self._open_list[-1]
        self._stm, self._sbm = 0, 0
        if self._dx < 0:
            self._stm = 0
            self._sbm = open - low
        elif self._dx > 0:
            self._stm = max(high - open, self._dx)
            self._sbm = 0

        STM = self._STM + self._stm
        SBM = self._SBM + self._sbm
        self._adtm = 0
        if STM > SBM:
            self._adtm = (STM - SBM) / STM
        elif STM < SBM:
            self._adtm = (STM - SBM) / SBM

        self._value['adtm'] = self._adtm
        self._value['maadtm'] = (self._sum_adtm + self._adtm) / self._M

    def value(self):
        return self._value

    def check(self):
        """
        1.ADTM指标在+1到-1之间波动。
        2.低于-0.5时为低风险区,高于+0.5时为高风险区,需注意风险。
        3.ADTM上穿ADTMMA时,买入股票;ADTM跌穿ADTMMA时,卖出股票。
        :return:
        """
        if self._value['adtm_'] < self._value['maadtm_']:
            if self._value['adtm'] > self._value['maadtm']:
                return 1
        else:
            if self._value['adtm'] < self._value['maadtm']:
                return -1
        return 0

    def __stm_sbm(self):
        stm_list = []
        sbm_list = []
        for i in range(len(self._open_list) - 1):
            dx = self._open_list[i + 1] - self._open_list[i]
            if dx < 0:
                stm_list.append(0)
                sbm_list.append(self._open_list[i + 1] - self._low_list[i + 1])
            elif dx > 0:
                stm_list.append(max(self._high_list[i + 1] - self._open_list[i + 1], dx))
                sbm_list.append(0)
            else:
                stm_list.append(0)
                sbm_list.append(0)
        return stm_list, sbm_list

    def to_string(self):
        return '{adtm:' + str(self._value['adtm']) + '    maadtm:' + str(self._value['maadtm']) + '}'


class BOLL_Real(IQuant):
    """
    BOLL:MA(CLOSE,M);
    UB:BOLL+2*STD(CLOSE,M);
    LB:BOLL-2*STD(CLOSE,M);
    """

    def __init__(self, security, M=20):
        self._security = security
        self._M = M

        self._value = {}
        self._close_list = None

    def before_market_open(self, context):
        if self._close_list is None:
            self._close_list = list(
                get_price(self._security, end_date=context.previous_date.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=self._M - 1)['close'])
        self._sum = sum(self._close_list)
        self._close_list.append(0)  # 占位符

    def after_market_close(self, context):
        close = get_price(self._security, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                          fields=['close'], skip_paused=True, count=1)['close'][0]
        self._eval(close)

        self._close_list.pop(0)

    def handle_data(self, context, data):
        self._eval(data[self._security].close)

    def handle_tick(self, context, tick):
        self._eval(tick.current)

    def _eval(self, close):
        self._value['boll'] = (self._sum + close) / self._M
        self._close_list[-1] = close
        std = math.sqrt(sum(map(lambda x: math.pow(x - self._value['boll'], 2), self._close_list)) / (self._M - 1))
        # std=np.std(self._close_list,ddof=1)
        self._value['ub'] = self._value['boll'] + 2 * std
        self._value['lb'] = self._value['boll'] - 2 * std

    def value(self):
        return self._value

    def check(self):
        # TODO
        return 0

    def to_string(self):
        return '{boll:' + str(self._value['boll']) + '    ub:' + str(self._value['ub']) + '    lb:' + str(
            self._value['lb']) + '}'


# ==================多指标共振集合================================
class QuantFactory(object):
    """
    多指标集合
    """

    def __init__(self):
        self._factory = []

    def add(self, factor):
        self._factory.append(factor)

    def before_market_open(self, context):
        for fac in self._factory:
            fac.before_market_open(context)

    def after_market_close(self, context):
        for fac in self._factory:
            fac.after_market_close(context)

    def handle_data(self, context, data):
        for fac in self._factory:
            fac.handle_data(context, data)

    def handle_tick(self, context, tick):
        for fac in self._factory:
            fac.handle_tick(context, tick)

    def check(self):
        for fac in self._factory:
            if fac.check() <= 0:
                return False
        return True


def sma_cn(X, n, m=2):
    y = X[0]
    for i in range(1, len(X)):
        y = (m * X[i] + (n - m) * y) / n
    return y

下面这部分是卖出策略,好多方法还待实现(其中有些部分是在本地测试用的这里也没有删除)

# -*- coding:utf-8 -*-
"""
@author: leonardo
@created time: 2018-10-10
@last modified time:2018-10-10
@description: 用于聚宽回测止盈止损策略
"""
import abc, six
from jqlib.technical_analysis import *


@six.add_metaclass(abc.ABCMeta)
class IBotSeller():
    """
    基于聚宽分钟(及tick)回测框架计算止盈止损接口,在分钟回测中使用self.handle_data(),在tick回测中使用self.handle_tick()两者只能二选一
    """

    @abc.abstractmethod
    def before_market_open(self, context):
        """
        在开盘之前执行
        :param context:
        :return:
        """
        pass

    @abc.abstractmethod
    def after_market_close(self, context):
        """
        在收盘之后执行
        ps:在此函数中需要执行一遍self._eval()是因为handle_data只执行到14:59就结束,收盘的数据15:00需要再次计算
        :param context:
        :return:
        """
        pass

    @abc.abstractmethod
    def handle_data(self, context, data):
        """
        在盘中(handle_data)执行
        :param context:
        :param data:
        :return:{True:卖出,False:继续持仓}
        """
        pass

    @abc.abstractmethod
    def handle_tick(self, context, tick):
        """
        在盘中(handle_tick)执行
        :param context:
        :param tick:
        :return:{True:卖出,False:继续持仓}
        """
        pass

    @abc.abstractmethod
    def sell_reason(self):
        """
        用于打印触发卖出的条件(方便测试)
        :return:
        """
        pass


class MonkeySeller(IBotSeller):
    """
    止盈止损策略
    """

    def __init__(self, code, buy_price, volume, stop_loss=0.03, moving_stop_loss=0.015):
        """

        :param code:
        :param buy_price:
        :param volume:
        :param stop_loss:
        :param moving_stop_loss:
        """
        self._code = code
        self._buy_price = buy_price
        self._volume = volume
        self._stop_loss_price = (1 - stop_loss) * buy_price
        self._moving_stop_loss = 1 - moving_stop_loss  # 节省计算时间

        # ---------end init value--------------------

        self._high = 0  # 自买入后经历的最高价
        self._today_high = 0
        self._today_high_time = '09:30:00'  # 监控当天最高价的时刻
        self._current_price = buy_price
        # self._k_bar = k_bar_util.K_Bar(time_step=2)
        self._is_limit_up = False  # 是否涨停
        self._limit_up_price = 0
        self._pre_bar = None  # 上一次交易日的K线数据,至少包含[open,close,high,low,vol,amount],多余属性暂且不管
        self._prediction_data = None  # gru模型预测的数据(该数据由另外一个程序生成,为避免其出错在使用时需要判断None)

        self._sell_reason = None  # 触发卖出的原因
        # ---------end set value-------------------

        if code.startswith('60'):  # 以60开头的为沪市
            self._code_symbol = self._code + '.XSHG'
        else:  # 以000开头的为深市
            self._code_symbol = self._code + '.XSHE'

    def before_market_open(self, context):
        self._today_high_time = '09:30:00'  # 开盘集合竞价结束开始连续竞价交易时刻
        self._today_high = 0

    def after_market_close(self, context):
        """
        每天收盘后更新数据作为明天的历史K数据,必须在每日收盘后执行一遍
        :param context: 暂时必须包含['close','high','low']
        :return:
        """
        self._pre_bar = get_price(self._code, end_date=context.current_dt.strftime('%Y-%m-%d'), frequency='1d',
                    skip_paused=True,
                    count=1).iloc[0]
        self._limit_up_price = calculate_limit_up(self._pre_bar['close'])

    def handle_data(self, context, data):
        return self.check_sell(price=data[self._code_symbol].close, check_time=context.current_dt.strftime('HH:MM:SS'))

    def handle_tick(self, context, tick):
        return self.check_sell(price=tick.current, check_time=context.current_dt.strftime('HH:MM:SS'))

    def sell_reason(self):
        return self._sell_reason

    def check_sell(self, price, check_time):
        """
        是否符合卖出条件
        :param price:
        :param check_time: hh:mm:ss
        :return:
        """
        self._current_price = price
        if self._high < self._current_price:
            self._high = self._current_price

        if self._today_high < self._current_price:
            self._today_high = self._current_price
            self._today_high_time = check_time

        # self._k_bar.add_tick(tick_data) #不是tick回测时暂时不需要

        if self._strategy_stop_loss():
            return True
        # if self._strategy_cross_down_yestoday_low():
        #     return True
        if self._strategy_moving_stop_loss():
            return True
        return False

    def _strategy_stop_loss(self):
        """
        低于止损价直接卖出
        :return:
        """
        if self._current_price < self._stop_loss_price:
            self._sell_reason = '低于止损价' + str(self._stop_loss_price)
            return True
        return False

    def _strategy_cross_down_yestoday_low(self):
        """突破昨日最低价直接卖出"""
        if self._current_price < self._pre_bar['low']:
            self._sell_reason = '突破昨日最低价' + str(self._pre_bar['low'])
            return True
        return False

    def _strategy_moving_stop_loss(self):
        """移动止损"""
        if self._current_price / self._buy_price > 1.015:  # 当超过1.5个点的收益时
            if self._current_price / self._high < self._moving_stop_loss:
                self._sell_reason = '移动止损从最高价' + str(self._high) + '下降' + str(round((1-self._moving_stop_loss)*100)) + '个百分点'
                return True
        return False

    def _strategy_high_time_limit(self, check_time, estimation_breakout_seconds):
        """
        TODO
        快速拉升(区分无量还是放量)在高点达到预定的收益时如果在预定时间内还未突破则卖出
        :param check_time:
        :param estimation_breakout_seconds:预计突破时间
        :return:
        """
        breakout_seconds = self.__time_delta(check_time, self._today_high_time)
        if breakout_seconds > estimation_breakout_seconds:
            return True
        return False

    def _strategy_limit_up(self):
        """
        TODO
        涨停时的策略
        :return:
        """
        pass

    def _strategy_to_4(self):
        """
        日涨幅0-4个点时的策略
        TODO
        :return:
        """
        pass

    def _strategy_4_to_7(self):
        """
        日涨幅4-7个点时的策略
        TODO
        :return:
        """
        pass

    def _strategy_7_to_10(self):
        """
        日涨幅7-10个点时的策略
        TODO
        :return:
        """
        pass

    def __time_delta(self, a, b):
        """
        时间间隔second
        :param a: xx:xx:xx
        :param b: xx:xx:xx
        :return: a-b
        """
        seconds = int(a[0:2]) - int(b[0:2])
        seconds *= 60
        seconds += int(a[3:5]) - int(b[3:5])
        seconds *= 60
        seconds += int(a[6:]) - int(b[6:])
        return seconds


# =====================辅助函数===========================================================
def calculate_limit_up(price):
    """
    10%的增长,小数点第三位四舍五入(排除st 5%的设定)
    :param price:
    :return:
    """
    price *= 1.1
    return round(price, 2)
分享到:
举报财经168客户端下载

全部回复

0/140

投稿 您想发表你的观点和看法?

更多人气分析师

  • 金算盘

    人气2696文章7761粉丝124

    高级分析师,混过名校,厮杀于股市和期货、证券市场多年,专注...

  • 李冉晴

    人气2296文章3821粉丝34

    李冉晴,专业现贷实盘分析师。

  • 张迎妤

    人气1896文章3305粉丝34

    个人专注于行情技术分析,消息面解读剖析,给予您第一时间方向...

  • 指导老师

    人气1856文章4423粉丝52

    暂无个人简介信息

  • 梁孟梵

    人气2152文章3177粉丝39

    qq:2294906466 了解群指导添加微信mfmacd

  • 刘钥钥1

    人气2016文章3119粉丝34

    专业从事现货黄金、现货白银模似实盘操作分析指导

  • 张亦巧

    人气2144文章4145粉丝45

    暂无个人简介信息

  • 金帝财神

    人气4720文章8329粉丝118

    本文由资深分析师金帝财神微信:934295330,指导黄金,白银,...

  • 金泰铬J

    人气2320文章3925粉丝51

    投资问答解咨询金泰铬V/信tgtg67即可获取每日的实时资讯、行情...