请 [注册] 或 [登录]  | 返回主站

量化交易吧 /  源码分享 帖子:3351394 新帖:12

发布聚宽功能增强器,极大降低你的编码难度!

汇市风云榜发表于:8 月 1 日 15:35回复(1)

上次发布了一系列的绩效函数,后来自己看了一下,发现要用到别的策略里去还是有些麻烦,于是动手进行了一些简化,将所有代码封装成了一个独立的 MyTrader 类,里面封装了大部分我们在聚宽编写策略时反复要用到的一些功能,使用起来比直接调用聚宽的 api 更简单,诸如 按比例自动调仓、自动平均调仓等功能。

具体使用方法,在研究下面新建一个 MyTrader.py 文件,然后将下面代码复制到该文件,之后只要在你的策略代码的顶部 使用 from MyTrader import MyTrader 即可引入,然后在策略代码开始的第一行 加上 trader = MyTrader(context, g) 就行了,很简单。

重点 推荐 buy_amount() 函数,你们可以试试加到自己的策略里去,不少策略回测的效果会有显著提升哦 -v- ,其他的,看代码注释就好了,很易用。

===================
buy_value()、sell_value()、buy_amount()、sell_amount() 都是对官方现有的 order 系列的包装,所不同的是这buy_value() 函数买入时会更积极,怎么理解呢?比如有时候看日志,你经常能看到 买入 180 股被系统重置为 100 股,但是实际上我们既然要买,肯定会倾向买入 200 股,实际测试的时候我也发现,相同策略改用 buy_value() 后回测的收益会比 用 order 系列函数高不少。

以下是代码举例:

buy_value('000001.XSHE', 10000)

买入 10000 元 中国银行

buy_amount('000001.XSHE', 100)

买入 100 股 平安银行

sell_value('000001.XSHE', 10000)

卖出 10000 元的 平安银行,如果第二个参数为0,则会直接清空 平安银行 的持仓

sell_amount('000001.XSHE', 100)

卖出 100 股的 平安银行,如果第二个参数为0,则会直接清空 平安银行 的持仓

adjust_target_percent('000001.XSHE', 0.5)

将当前持有的 平安银行 仓位调整为当前总金额的 50%(总金额=持仓市值 可用资金),假设当前平安银行的持仓总值为60%,那么就会卖出 10% 的股票,如果是 40%,则会买入 10%,总之会尽量保证仓位为 50%,这里要注意,是尽量,不是一定会,因为有时候你可能钱不够,买不到。
第二个参数为0的话就直接清空该股票仓位

adjust_target_value('000001.XSHE', 10000)

类似上面的函数,会将 中国平安 的持仓总额调整到 10000元左右

adjust_auto_avg()

这个函数不接受任何参数,会自动根据持仓股票数量动态计算每只股票的目标仓位,然后多出的卖掉,少了的买入,反正会尽量让他们仓位保持均衡。

以上就是几个比较关键的函数用法,剩下的那些用的机会少,看参数也能明白是干什么的,我就不多写了。

# coding=utf-8
import math
from kuanke.user_space_api import *
from pandas import Series


class MyTrader(object):
    def __init__(self, _context, _g, _print_log=True):
        super(MyTrader, self).__init__()
        self.__context = _context
        self.__g = _g
        self.__g.print_dict = {'sell': [], 'buy': []}
        self.__print_log = _print_log

        try:
            self.__g.trade_statics
        except:
            self.__g.trade_statics = {
                'total_count': 0,  # 交易总次数(全部卖出时算一次交易)
                'success_count': 0,  # 交易成功次数(卖出时盈利即算一次成功)
                'day_count': 0,  # 交易天数(每天累加一次)
                'win_percent': [],  # 盈利数据
                'loss_percent': [],  # 亏损数据
                'buy_history': Series(),  # 持股历史记录
                'order_list': [],  # 自制订单列表,用 DataFrame 来做好了
            }

        try:
            self.__g.stop_loss_list
        except:
            self.__g.stop_loss_list = {}  # 止损列表,{'000001.SHGH': '2016-04-21'}
            self.__g.stop_loss_count = 0  # 止损次数

        self.__add_day_count()

    # 买入指定数量的股票
    def buy_amount(self, stock, amount):
        if 100 <= amount:
            self.__g.print_dict["buy"].append({"stock": str(stock), "amount": int(amount)})
            order_result = order(stock, amount)
            '''
            if order_result is not None:
                if stock in self.__g.trade_statics['buy_history']:
                    self.__g.trade_statics['buy_history'][stock]  = 1
                else:
                    self.__g.trade_statics['buy_history'][stock] = 1
            '''
        else:
            self.log("买入 {2}({1}) amount 为 {0},不足100,无法买入".format(amount, stock, self.get_stock_name(stock)))

    # 买入指定金额的股票
    def buy_value(self, stock, value):
        price = self.get_stock_price(stock)
        if not math.isnan(price):
            amount = math.floor(round(value / price) / 100) * 100
            self.buy_amount(stock, amount)
        else:
            self.log("买入 {1}({0}) buy_value() 获得的 price 为 NAN,跳过该订单".format(stock, self.get_stock_name(stock)))

    # 卖出指定数量的股票,数量为0表示清空当前持仓
    def sell_amount(self, stock, amount):
        if 0 == amount:
            record_data = self.get_record_data(stock)

            amount = self.get_stock_sellable_amount(stock)
            self.__g.print_dict['sell'].append({"stock": str(stock), "amount": int(amount)})
            order_result = order_target_value(stock, 0)

            if order_result is not None:
                self.__g.trade_statics['total_count']  = 1
                if record_data['current_value'] > record_data['total_cost']:
                    self.__g.trade_statics['success_count']  = 1
                    self.__g.trade_statics['win_percent'].append([stock, record_data['percent'], self.get_today_date()])
                else:
                    self.__g.trade_statics['loss_percent'].append(
                        [stock, record_data['percent'], self.get_today_date()])
        else:
            self.__g.print_dict['sell'].append({"stock": str(stock), "amount": int(amount)})
            order(stock, -amount)

    # 卖出指定金额的股票,金额为0表示清空当前持仓
    def sell_value(self, stock, value):
        if 0 == int(value):
            amount = 0
        else:
            price = self.get_stock_price(stock)
            if not math.isnan(price):
                amount = math.floor(round(value / price) / 100) * 100
            else:
                self.log("卖出 {1}({0}) sell_value() 获得的 price 为 NAN ,跳过该订单".format(stock, self.get_stock_name(stock)))
                return

        self.sell_amount(stock, amount)

    # 将股票的持仓调整到指定的百分比,target_percent不能大于1或者小于0,若为0则直接清空所有仓位
    def adjust_target_percent(self, stock, target_percent):
        if 0 > target_percent or 1 < target_percent:
            self.log("股票 {1}({0}) target_percent 不能大于1或者小于0".format(stock, self.get_stock_name(stock)))
            return

        portfolio_value = self.get_portfolio_value()
        total_amount = self.get_stock_total_amount(stock)
        current_price = self.get_stock_last_sell_price(stock)
        current_value = total_amount * current_price
        target_value = portfolio_value * target_percent

        if 0 == target_percent:
            self.clean_position(stock)
            return

        if 0 == total_amount:
            self.buy_value(stock, target_value)
            return

        if target_value > current_value:
            self.buy_value(stock, target_value - current_value)
        elif target_value < current_value:
            self.sell_value(stock, current_value - target_value)

    # 将股票持仓调整到指定的金额,target_value不能小于0,若为0则直接清空所有仓位
    def adjust_target_value(self, stock, target_value):
        if 0 > target_value:
            self.log("股票 {1}({0}) target_value 不能小于0".format(stock, self.get_stock_name(stock)))
            return

        if 0 == target_value:
            self.clean_position(stock)
            return

        current_value = self.get_stock_total_amount(stock) * self.get_stock_last_sell_price(stock)

        if target_value > current_value:
            self.buy_value(stock, target_value - current_value)
        elif target_value < current_value:
            self.sell_value(stock, current_value - target_value)

    # 按持仓股票数量,自动将所有股票仓位调整到平均值
    def adjust_auto_avg(self):
        positions = self.get_current_positions()
        position_count = len(positions)
        if 0 >= position_count:
            self.log("当前没有持仓,无法自动平均仓位")
            return

        avg_percent = round(1.0 / position_count, 3)
        self.log("当前持仓数为 {0},所有股票仓位自动调整到 {1}%".format(position_count, avg_percent * 100))
        for stock in positions.keys():
            self.adjust_target_percent(stock, avg_percent)

    # 个股止损,limit表示最大亏损百分百
    def stock_stop_loss(self, stock, limit=8):
        amount = self.get_stock_total_amount(stock)
        total_cost = self.get_stock_avg_cost(stock) * amount
        position_value = self.get_stock_last_sell_price(stock) * amount

        if position_value > total_cost:
            return

        percent = round((position_value - total_cost) / total_cost * 100, 2)
        if percent <= -limit:
            # 卖出止损
            self.log('{0}({1}) 亏损已经超过 {2}%,卖出止损'.format(self.get_stock_name(stock), stock, percent))
            self.clean_position(stock)
            self.__g.stop_loss_count  = 1
            self.__g.stop_loss_list[stock] = self.get_today_date()

    # 清空当前所有可卖持仓
    def clean_positions(self):
        for stock in self.get_current_positions().keys():
            self.clean_position(stock)

    # 清空指定股票仓位
    def clean_position(self, stock):
        if self.has_position(stock):
            self.sell_value(stock, 0)
        else:
            self.log("股票 {1}({0}) 当前没有持仓,无法清空".format(stock, self.get_stock_name(stock)))

    # 指定股票是否拥有持仓
    def has_position(self, stock):
        return 0 < self.get_stock_total_amount(stock)

    # 半仓处理
    def half_positions(self):
        for stock in self.get_current_positions().keys():
            half_amount = self.get_half_amount(self.get_stock_sellable_amount(stock))
            if half_amount is not None:
                self.sell_amount(stock, half_amount)

    # 打印出绩效报表
    def print_report(self, end_date=None):
        start_date = self.get_today_date()
        if end_date is None or str(start_date) == str(end_date):
            win_rate = self.compute_win_rate()
            most_win = self.compute_most_win_percent()
            most_loss = self.compute_most_loss_percent()
            total_profit = self.compute_total_profit()
            starting_cash = self.get_starting_cash()
            total_yield = round(total_profit / starting_cash, 4)
            trade_days = self.get_trade_day_count()
            per_day_win_loss = 0 if 0 == total_profit or 0 == trade_days else round(total_profit / trade_days, 2)
            per_win_loss = 0 if 0 == total_profit or 0 == self.__g.trade_statics['total_count'] else round(
                total_profit / self.__g.trade_statics['total_count'])

            print "-"
            print '------------绩效报表------------'
            print '总资产: {0}, 本金: {1}, 盈利: {2}'.format(starting_cash   total_profit, starting_cash, total_profit)
            print '交易天数: {0}, 交易次数: {1}, 盈利次数: {2}'.format(trade_days,
                                                                        self.__g.trade_statics['total_count'],
                                                                        self.__g.trade_statics['success_count'])
            print '胜率: {2}, 日均盈亏: {0}, 每笔盈亏: {1}'.format(per_day_win_loss, per_win_loss, str(win_rate * 100)   str('%'))
            print '单次盈利最高: {0}, 盈利比例: {1}%({2})'.format(most_win['stock'], most_win['value'], most_win['date'])
            print '单次亏损最高: {0}, 亏损比例: {1}%({2})'.format(most_loss['stock'], most_loss['value'], most_loss['date'])
            print "总收益率: {0}%".format(total_yield * 100)
            print '--------------------------------'
            print "-"

    # 计算当前胜率
    def compute_win_rate(self):
        win_rate = 0
        if 0 < self.__g.trade_statics['total_count'] and 0 < self.__g.trade_statics['success_count']:
            win_rate = round(self.__g.trade_statics['success_count'] / float(self.__g.trade_statics['total_count']), 2)

        return win_rate

    # 计算当前最高盈利比例
    def compute_most_win_percent(self):
        result = {'stock': None, 'value': None, 'date': None}
        for statics in self.__g.trade_statics['win_percent']:
            if result['value'] is None:
                result['stock'] = statics[0]
                result['value'] = statics[1]
                result['date'] = statics[2]
            else:
                if statics[1] > result['value']:
                    result['stock'] = statics[0]
                    result['value'] = statics[1]
                    result['date'] = statics[2]

        return result

    # 计算当前最高亏损比例
    def compute_most_loss_percent(self):
        result = {'stock': None, 'value': None, 'date': None}
        for statics in self.__g.trade_statics['loss_percent']:
            if result['value'] is None:
                result['stock'] = statics[0]
                result['value'] = statics[1]
                result['date'] = statics[2]
            else:
                if statics[1] < result['value']:
                    result['stock'] = statics[0]
                    result['value'] = statics[1]
                    result['date'] = statics[2]

        return result

    # 计算当前总盈利金额
    def compute_total_profit(self):
        return self.get_portfolio_value() - self.get_starting_cash()

    def get_today_date(self):
        return self.get_context().current_dt.strftime("%Y-%m-%d")

    def get_today_week(self):
        return self.get_context().current_dt.isoweekday()

    # 获得股票上一分钟的价格
    def get_pre_minute_price(self, stock):
        __today = self.get_today_date()
        __hour = self.get_context().current_dt.hour
        __minute = self.get_context().current_dt.minute

        __start_time = "{0} {1}:{2}:00".format(__today, __hour, __minute)
        __end_time = "{0} {1}:{2}:59".format(__today, __hour, __minute)

        return get_price(stock, __start_time, __end_time, '1m', 'close')['close'][0]

    def get_current_positions(self):
        return self.get_context().portfolio.positions

    def get_stock_sellable_amount(self, stock):
        return self.get_context().portfolio.positions[stock].sellable_amount

    def get_stock_total_amount(self, stock):
        return self.get_context().portfolio.positions[stock].total_amount

    def get_stock_avg_cost(self, stock):
        return self.get_context().portfolio.positions[stock].avg_cost

    def get_stock_last_sell_price(self, stock):
        return self.get_context().portfolio.positions[stock].last_sale_price

    def get_starting_cash(self):
        return self.get_context().portfolio.starting_cash

    def get_trade_day_count(self):
        return self.__g.trade_statics['day_count']

    def get_portfolio_value(self):
        return self.get_context().portfolio.portfolio_value

    def get_context(self):
        return self.__context

    @staticmethod
    def get_stock_name(stock):
        return get_security_info(stock).display_name

    @staticmethod
    def get_stock_price(stock, interval=1):
        h = attribute_history(stock, interval, unit='1d', fields='close', skip_paused=True)
        return h['close'].values[0]

    @staticmethod
    def remove_paused(stock_list):
        current_data = get_current_data(stock_list)
        return [s for s in stock_list if not current_data[s].paused]

    @staticmethod
    def remove_st(stock_list):
        current_data = get_current_data(stock_list)
        return [s for s in stock_list if not current_data[s].is_st]

    @staticmethod
    def remove_cyb(stock_list):
        return [s for s in stock_list if not s.startswith("300")]

    # 取指定数量的一半
    @staticmethod
    def get_half_amount(amount):
        if 100 >= amount:
            return None

        return math.ceil(round(amount / 2) / 100) * 100

    # 预先获得要统计的数据,只有订单确实执行成功才记录
    def get_record_data(self, stock):
        total_amount = self.get_stock_total_amount(stock)
        avg_cost = self.get_stock_avg_cost(stock)
        last_price = self.get_stock_last_sell_price(stock)

        current_value = total_amount * last_price
        total_cost = total_amount * avg_cost
        percent = round((current_value - total_cost) / total_cost * 100, 2)

        return {'current_value': current_value, 'total_cost': total_cost, 'percent': percent}

    def log(self, msg):
        if self.__print_log:
            print(msg)

    def __add_day_count(self):
        self.__g.trade_statics['day_count']  = 1

全部回复

0/140

量化课程

    移动端课程