From 9478fff9b9d453cf9b5c30732b086ce5d02bec3c Mon Sep 17 00:00:00 2001 From: zengbin93 <1257391203@qq.com> Date: Sun, 26 Feb 2023 20:09:59 +0800 Subject: [PATCH] =?UTF-8?q?V0.9.9=20=E6=9B=B4=E6=96=B0=E4=B8=80=E6=89=B9?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=20(#127)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * 0.9.6 update * 0.9.6 update * 0.9.7 新增 tas_kdj_evc_V221201 信号 * 0.9.7 新增 tas_kdj_evc_V221201 信号 * 0.9.7 新增飞书消息发送 * 0.9.7 新增结合大盘择时或股票池的择时优化分析 * 0.9.7 update * 0.9.7 fix update_bi * 0.9.7 update * 0.9.7 update * 0.9.7 CzscTrader 新增 on_sig 模式 * 0.9.7 fix * 0.9.7 update * 0.9.7 增加持仓评价 * 0.9.7 支持自定义仓位集成方法 * 0.9.7 支持自定义仓位集成方法 * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 fix test * 0.9.7 fix test * 0.9.7 update * 0.9.7 deprecated * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 update * 0.9.7 fix bug * 0.9.7 update * 0.9.7 update * 0.9.7 新增 QmtTradeManager 用于实盘仿真 * 0.9.7 新增 QmtTradeManager 用于实盘仿真 * 0.9.7 update examples * 0.9.7 fix test * 0.9.8 start coding * 0.9.8 update * 0.9.8 update * 0.9.8 update * 0.9.8 新增假突破信号 * 0.9.8 新增K线重采样函数 * 0.9.8 新增K线重采样函数 * 0.9.8 fix bug * 0.9.8 QMT支持任意 base_freq * 0.9.8 update * 0.9.8 clean codes * 0.9.8 update vol signals * 0.9.8 clean codes * 0.9.8 update * 0.9.8 update * 0.9.8 update * 0.9.8 Position 对象新增 dump 方法 * 0.9.8 优化信号 * 0.9.8 新增 base_freq * update * 0.9.9 fist commit * 0.9.9 优化QMT对接代码 * 0.9.9 新增Tushare数据查看笔、分型的BI页面 * 0.9.9 新增信号函数 * 0.9.9 update * 0.9.9 优化TraderCallback * 0.9.9 优化TraderCallback * 0.9.9 新增 pos_changed 属性 * 0.9.9 优化策略基类 * 0.9.9 优化Position对象 * 0.9.9 新增两个信号函数 * 0.9.9 优化 Position 对象 * 0.9.9 新增plotly绘制K线图 * 0.9.9 新增plotly绘制K线图 * 0.9.9 新增股票beta策略 * 0.9.9 优化掘金终端对接代码 --- .github/workflows/pythonpackage.yml | 2 +- czsc/__init__.py | 6 +- czsc/connectors/gm_connector.py | 712 ++++++++++++++++++++ czsc/connectors/qmt_connector.py | 188 +++++- czsc/fsa/base.py | 2 +- czsc/gms/__init__.py | 7 - czsc/gms/gm_base.py | 466 ------------- czsc/gms/gm_stocks.py | 400 ----------- czsc/objects.py | 117 +++- czsc/signals/__init__.py | 3 + czsc/signals/bar.py | 35 + czsc/signals/cxt.py | 122 +++- czsc/signals/tas.py | 2 +- czsc/strategies.py | 139 +++- czsc/traders/base.py | 31 +- czsc/traders/performance.py | 8 +- czsc/utils/__init__.py | 1 + czsc/utils/bar_generator.py | 4 +- czsc/utils/plotly_plot.py | 151 +++++ examples/czsc_tushare_stream.py | 48 ++ examples/signals_dev/bar_vol_bs1_V230224.py | 79 +++ examples/signals_dev/cxt_bi_end_V230222.py | 125 ++++ examples/signals_dev/cxt_bi_end_V230224.py | 82 +++ examples/ts_check_signal_acc.py | 86 +-- examples/use_czsc_trader.py | 12 +- requirements.txt | 3 +- test/test_plotly_plot.py | 40 ++ test/test_strategy.py | 33 + test/test_trader_base.py | 7 +- 29 files changed, 1886 insertions(+), 1025 deletions(-) create mode 100644 czsc/connectors/gm_connector.py delete mode 100644 czsc/gms/__init__.py delete mode 100644 czsc/gms/gm_base.py delete mode 100644 czsc/gms/gm_stocks.py create mode 100644 czsc/utils/plotly_plot.py create mode 100644 examples/czsc_tushare_stream.py create mode 100644 examples/signals_dev/bar_vol_bs1_V230224.py create mode 100644 examples/signals_dev/cxt_bi_end_V230222.py create mode 100644 examples/signals_dev/cxt_bi_end_V230224.py create mode 100644 test/test_plotly_plot.py create mode 100644 test/test_strategy.py diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index 481711d04..72aa1f7f5 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ name: Python package on: push: - branches: [ master ] + branches: [ master, V0.9.9 ] pull_request: branches: [ master ] diff --git a/czsc/__init__.py b/czsc/__init__.py index 4855157cf..2c2d37e2a 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -15,12 +15,14 @@ from czsc.utils.cache import home_path, get_dir_size, empty_cache_path from czsc.traders import CzscTrader, CzscSignals, generate_czsc_signals from czsc.traders import PairsPerformance, combine_holds_and_pairs, combine_dates_and_pairs +from czsc.strategies import CzscStrategyBase +from czsc.utils import KlineChart, BarGenerator, resample_bars, dill_dump, dill_load, read_json, save_json -__version__ = "0.9.8" +__version__ = "0.9.9" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20230210" +__date__ = "20230220" if envs.get_welcome(): diff --git a/czsc/connectors/gm_connector.py b/czsc/connectors/gm_connector.py new file mode 100644 index 000000000..13c7e7813 --- /dev/null +++ b/czsc/connectors/gm_connector.py @@ -0,0 +1,712 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2023/2/26 18:49 +describe: 掘金量化终端对接 +""" +import os +import dill +import czsc +import inspect +import pandas as pd +from loguru import logger +try: + from gm.api import * +except: + logger.warning(f"gm 模块没有安装") +from datetime import datetime, timedelta +from collections import OrderedDict +from typing import List +from czsc import CzscTrader, CzscStrategyBase +from czsc.data import freq_cn2gm +from czsc.utils import qywx as wx +from czsc.utils import BarGenerator +from czsc.objects import RawBar, Freq + + +dt_fmt = "%Y-%m-%d %H:%M:%S" +date_fmt = "%Y-%m-%d" + +assert czsc.__version__ >= "0.9.8" + + +def set_gm_token(token): + __file_token = os.path.join(os.path.expanduser("~"), "gm_token.txt") + with open(__file_token, 'w', encoding='utf-8') as f: + f.write(token) + + +file_token = os.path.join(os.path.expanduser("~"), "gm_token.txt") +if not os.path.exists(file_token): + print("{} 文件不存在,请单独启动一个 python 终端,调用 set_gm_token 方法创建该文件,再重新执行。".format(file_token)) +else: + gm_token = open(file_token, encoding="utf-8").read() + set_token(gm_token) + + +def is_trade_date(dt): + """判断 dt 时刻是不是交易日期""" + dt = pd.to_datetime(dt) + date_ = dt.strftime("%Y-%m-%d") + trade_dates = get_trading_dates(exchange='SZSE', start_date=date_, end_date=date_) + if trade_dates: + return True + else: + return False + + +def is_trade_time(dt): + """判断 dt 时刻是不是交易时间""" + dt = pd.to_datetime(dt) + date_ = dt.strftime("%Y-%m-%d") + trade_dates = get_trading_dates(exchange='SZSE', start_date=date_, end_date=date_) + if trade_dates and "15:00" > dt.strftime("%H:%M") > "09:30": + return True + else: + return False + + +def get_symbol_names(): + """获取股票市场标的列表,包括股票、指数等""" + df = get_instruments(exchanges='SZSE,SHSE', fields="symbol,sec_name", df=True) + shares = {row['symbol']: row['sec_name'] for _, row in df.iterrows()} + return shares + + +def format_kline(df, freq: Freq): + bars = [] + for i, row in df.iterrows(): + # amount 单位:元 + bar = RawBar(symbol=row['symbol'], id=i, freq=freq, dt=row['eob'], open=round(row['open'], 2), + close=round(row['close'], 2), high=round(row['high'], 2), + low=round(row['low'], 2), vol=row['volume'], amount=row['amount']) + bars.append(bar) + return bars + + +def get_kline(symbol, end_time, freq='60s', count=33000, adjust=ADJUST_PREV): + """获取K线数据 + + :param symbol: 标的代码 + :param end_time: 结束时间 + :param freq: K线周期 + :param count: K线数量 + :param adjust: 复权方式 + :return: + """ + if isinstance(end_time, datetime): + end_time = end_time.strftime(dt_fmt) + + exchange = symbol.split(".")[0] + freq_map_ = {'60s': Freq.F1, '300s': Freq.F5, '900s': Freq.F15, '1800s': Freq.F30, + '3600s': Freq.F60, '1d': Freq.D} + + if exchange in ["SZSE", "SHSE"]: + df = history_n(symbol=symbol, frequency=freq, end_time=end_time, adjust=adjust, + fields='symbol,eob,open,close,high,low,volume,amount', count=count, df=True) + else: + df = history_n(symbol=symbol, frequency=freq, end_time=end_time, adjust=adjust, + fields='symbol,eob,open,close,high,low,volume,amount,position', count=count, df=True) + return format_kline(df, freq_map_[freq]) + + +def get_init_bg(symbol: str, + end_dt: [str, datetime], + base_freq: str, + freqs: List[str], + max_count=1000, + adjust=ADJUST_PREV): + """获取 symbol 的初始化 bar generator""" + if isinstance(end_dt, str): + end_dt = pd.to_datetime(end_dt, utc=True) + end_dt = end_dt.tz_convert('dateutil/PRC') + # 时区转换之后,要减去8个小时才是设置的时间 + end_dt = end_dt - timedelta(hours=8) + else: + assert end_dt.tzinfo._filename == 'PRC' + + delta_days = 180 + last_day = (end_dt - timedelta(days=delta_days)).replace(hour=16, minute=0) + + bg = BarGenerator(base_freq, freqs, max_count) + if "周线" in freqs or "月线" in freqs: + d_bars = get_kline(symbol, last_day, freq_cn2gm["日线"], count=5000, adjust=adjust) + bgd = BarGenerator("日线", ['周线', '月线', '季线', '年线']) + for b in d_bars: + bgd.update(b) + else: + bgd = None + + for freq in bg.bars.keys(): + if freq in ['周线', '月线', '季线', '年线']: + bars_ = bgd.bars[freq] + else: + bars_ = get_kline(symbol, last_day, freq_cn2gm[freq], max_count, adjust) + bg.init_freq_bars(freq, bars_) + print(f"{symbol} - {freq} - {len(bg.bars[freq])} - last_dt: {bg.bars[freq][-1].dt} - last_day: {last_day}") + + bars2 = get_kline(symbol, end_dt, freq_cn2gm[base_freq], + count=int(240 / int(base_freq.strip('分钟')) * delta_days)) + data = [x for x in bars2 if x.dt > last_day] + assert len(data) > 0 + print(f"{symbol}: bar generator 最新时间 {bg.bars[base_freq][-1].dt.strftime(dt_fmt)},还有{len(data)}行数据需要update") + return bg, data + + +order_side_map = {OrderSide_Unknown: '其他', OrderSide_Buy: '买入', OrderSide_Sell: '卖出'} +order_status_map = { + OrderStatus_Unknown: "其他", + OrderStatus_New: "已报", + OrderStatus_PartiallyFilled: "部成", + OrderStatus_Filled: "已成", + OrderStatus_Canceled: "已撤", + OrderStatus_PendingCancel: "待撤", + OrderStatus_Rejected: "已拒绝", + OrderStatus_Suspended: "挂起(无效)", + OrderStatus_PendingNew: "待报", + OrderStatus_Expired: "已过期", +} +pos_side_map = {PositionSide_Unknown: '其他', PositionSide_Long: '多头', PositionSide_Short: '空头'} +pos_effect_map = { + PositionEffect_Unknown: '其他', + PositionEffect_Open: '开仓', + PositionEffect_Close: '平仓', + PositionEffect_CloseToday: '平今仓', + PositionEffect_CloseYesterday: '平昨仓', +} +exec_type_map = { + ExecType_Unknown: "其他", + ExecType_New: "已报", + ExecType_Canceled: "已撤销", + ExecType_PendingCancel: "待撤销", + ExecType_Rejected: "已拒绝", + ExecType_Suspended: "挂起", + ExecType_PendingNew: "待报", + ExecType_Expired: "过期", + ExecType_Trade: "成交(有效)", + ExecType_OrderStatus: "委托状态", + ExecType_CancelRejected: "撤单被拒绝(有效)", +} + + +def on_order_status(context, order): + """订单状态更新通知 + + https://www.myquant.cn/docs/python/python_object_trade#007ae8f5c7ec5298 + + :param context: + :param order: + :return: + """ + if not is_trade_time(context.now): + return + + symbol = order.symbol + latest_dt = context.now.strftime("%Y-%m-%d %H:%M:%S") + + msg = f"订单状态更新通知:\n{'*' * 31}\n" \ + f"更新时间:{latest_dt}\n" \ + f"标的名称:{symbol} {context.stocks.get(symbol, '无名')}\n" \ + f"操作类型:{order_side_map[order.side]}{pos_effect_map[order.position_effect]}\n" \ + f"下单价格:{round(order.price, 2)}\n" \ + f"最新状态:{order_status_map[order.status]}\n" \ + f"委托(股):{int(order.volume)}\n" \ + f"已成(股):{int(order.filled_volume)}\n" \ + f"均价(元):{round(order.filled_vwap, 2)}" + + logger.info(msg.replace("\n", " - ").replace('*', "")) + if context.mode != MODE_BACKTEST and order.status in [1, 3, 5, 8, 9, 12]: + wx.push_text(content=str(msg), key=context.wx_key) + + +def on_execution_report(context, execrpt): + """响应委托被执行事件,委托成交或者撤单拒绝后被触发。 + + https://www.myquant.cn/docs/python/python_trade_event#on_execution_report%20-%20%E5%A7%94%E6%89%98%E6%89%A7%E8%A1%8C%E5%9B%9E%E6%8A%A5%E4%BA%8B%E4%BB%B6 + https://www.myquant.cn/docs/python/python_object_trade#ExecRpt%20-%20%E5%9B%9E%E6%8A%A5%E5%AF%B9%E8%B1%A1 + + :param context: + :param execrpt: + :return: + """ + if not is_trade_time(context.now): + return + + latest_dt = context.now.strftime(dt_fmt) + msg = f"委托订单被执行通知:\n{'*' * 31}\n" \ + f"时间:{latest_dt}\n" \ + f"标的:{execrpt.symbol}\n" \ + f"名称:{context.stocks.get(execrpt.symbol, '无名')}\n" \ + f"方向:{order_side_map[execrpt.side]}{pos_effect_map[execrpt.position_effect]}\n" \ + f"成交量:{int(execrpt.volume)}\n" \ + f"成交价:{round(execrpt.price, 2)}\n" \ + f"执行回报类型:{exec_type_map[execrpt.exec_type]}" + + logger.info(msg.replace("\n", " - ").replace('*', "")) + if context.mode != MODE_BACKTEST and execrpt.exec_type in [1, 5, 6, 8, 12, 19]: + wx.push_text(content=str(msg), key=context.wx_key) + + +def on_backtest_finished(context, indicator): + """回测结束回调函数 + + :param context: + :param indicator: + https://www.myquant.cn/docs/python/python_object_trade#bd7f5adf22081af5 + :return: + """ + wx_key = context.wx_key + symbols = context.symbols + data_path = context.data_path + + logger.info(str(indicator)) + logger.info("回测结束 ... ") + cash = context.account().cash + + for k, v in indicator.items(): + if isinstance(v, float): + indicator[k] = round(v, 4) + + row = OrderedDict({ + "标的数量": len(context.symbols_info.keys()), + "开始时间": context.backtest_start_time, + "结束时间": context.backtest_end_time, + "累计收益": indicator['pnl_ratio'], + "最大回撤": indicator['max_drawdown'], + "年化收益": indicator['pnl_ratio_annual'], + "夏普比率": indicator['sharp_ratio'], + "盈利次数": indicator['win_count'], + "亏损次数": indicator['lose_count'], + "交易胜率": indicator['win_ratio'], + "累计出入金": int(cash['cum_inout']), + "累计交易额": int(cash['cum_trade']), + "累计手续费": int(cash['cum_commission']), + "累计平仓收益": int(cash['cum_pnl']), + "净收益": int(cash['pnl']), + }) + sdt = pd.to_datetime(context.backtest_start_time).strftime('%Y%m%d') + edt = pd.to_datetime(context.backtest_end_time).strftime('%Y%m%d') + file_xlsx = os.path.join(data_path, f'{context.name}_{sdt}_{edt}.xlsx') + file = pd.ExcelWriter(file_xlsx, mode='w') + + dfe = pd.DataFrame({"指标": list(row.keys()), "值": list(row.values())}) + dfe.to_excel(file, sheet_name='回测表现', index=False) + + logger.info("回测结果:{}".format(row)) + content = "" + for k, v in row.items(): + content += "{}: {}\n".format(k, v) + wx.push_text(content=content, key=wx_key) + + +def on_error(context, code, info): + if not is_trade_time(context.now): + return + + msg = "{} - {}".format(code, info) + logger.warning(msg) + if context.mode != MODE_BACKTEST: + wx.push_text(content=msg, key=context.wx_key) + + +def on_account_status(context, account): + """响应交易账户状态更新事件,交易账户状态变化时被触发 + + https://www.myquant.cn/docs/python/python_trade_event#4f07d24fc4314e3c + """ + status = account['status'] + if status['state'] == 3: + return + + if not is_trade_time(context.now): + return + + msg = f"{str(account)}" + logger.warning(msg) + if context.mode != MODE_BACKTEST: + wx.push_text(content=msg, key=context.wx_key) + + +def is_order_exist(context, symbol, side) -> bool: + """判断同方向订单是否已经存在 + + :param context: + :param symbol: 交易标的 + :param side: 交易方向 + :return: bool + """ + uo = context.unfinished_orders + if not uo: + return False + else: + for o in uo: + if o.symbol == symbol and o.side == side: + context.logger.info("同类型订单已存在:{} - {}".format(symbol, side)) + return True + return False + + +def cancel_timeout_orders(context, max_m=30): + """实盘仿真,撤销挂单时间超过 max_m 分钟的订单。 + + :param context: + :param max_m: 最大允许挂单分钟数 + :return: + """ + for u_order in context.unfinished_orders: + if context.now - u_order.created_at >= timedelta(minutes=max_m): + order_cancel(u_order) + + +def gm_take_snapshot(gm_symbol, end_dt=None, file_html=None, + freqs=('1分钟', '5分钟', '15分钟', '30分钟', '60分钟', '日线', '周线', '月线'), + adjust=ADJUST_PREV, max_count=1000): + """使用掘金的数据对任意标的、任意时刻的状态进行快照 + + :param gm_symbol: + :param end_dt: + :param file_html: + :param freqs: + :param adjust: + :param max_count: + :return: + """ + if not end_dt: + end_dt = datetime.now().strftime(dt_fmt) + + bg, data = get_init_bg(gm_symbol, end_dt, freqs[0], freqs[1:], max_count, adjust) + ct = CzscTrader(bg) + for bar in data: + ct.update(bar) + + if file_html: + ct.take_snapshot(file_html) + print(f'saved into {file_html}') + else: + ct.open_in_browser() + return ct + + +def save_traders(context): + """实盘:保存交易员快照""" + if context.now.isoweekday() > 5: + print(f"save_traders: {context.now} 不是交易时间") + return + + for symbol in context.symbols_info.keys(): + trader: CzscTrader = context.symbols_info[symbol]['trader'] + if context.mode != MODE_BACKTEST: + file_trader = os.path.join(context.data_path, f'traders/{symbol}.ct') + dill.dump(trader, open(file_trader, 'wb')) + + +indices = { + "上证指数": 'SHSE.000001', + "上证50": 'SHSE.000016', + "沪深300": "SHSE.000300", + "中证1000": "SHSE.000852", + "中证500": "SHSE.000905", + + "深证成指": "SZSE.399001", + "创业板指数": 'SZSE.399006', + "深次新股": "SZSE.399678", + "中小板指": "SZSE.399005", + "国证2000": "SZSE.399303", + "小盘成长": "SZSE.399376", + "小盘价值": "SZSE.399377", +} + + +def get_index_shares(name, end_date=None): + """获取某一交易日的指数成分股列表 + + symbols = get_index_shares("上证50", "2019-01-01 09:30:00") + """ + if not end_date: + end_date = datetime.now().strftime(date_fmt) + else: + end_date = pd.to_datetime(end_date).strftime(date_fmt) + constituents = get_history_constituents(indices[name], end_date, end_date)[0] + symbol_list = [k for k, v in constituents['constituents'].items()] + return list(set(symbol_list)) + + +def on_bar(context, bars): + """订阅K线回调函数""" + context.unfinished_orders = get_unfinished_orders() + cancel_timeout_orders(context, max_m=30) + + for bar in bars: + symbol = bar['symbol'] + trader: CzscTrader = context.symbols_info[symbol]['trader'] + + # 确保数据更新到最新时刻 + base_freq = trader.base_freq + bars = context.data(symbol=symbol, frequency=freq_cn2gm[base_freq], count=100, + fields='symbol,eob,open,close,high,low,volume,amount') + bars = format_kline(bars, freq=trader.bg.freq_map[base_freq]) + bars_new = [x for x in bars if x.dt > trader.bg.bars[base_freq][-1].dt] + if bars_new: + for bar_ in bars_new: + trader.update(bar_) + + # sync_long_position(context, trader) + + +def report_account_status(context): + """报告账户持仓状态""" + if context.now.isoweekday() > 5: + return + + latest_dt = context.now.strftime(dt_fmt) + account = context.account(account_id=context.account_id) + cash = account.cash + positions = account.positions() + + logger.info("=" * 30 + f" 账户状态【{latest_dt}】 " + "=" * 30) + cash_report = f"净值:{int(cash.nav)},可用资金:{int(cash.available)}," \ + f"浮动盈亏:{int(cash.fpnl)},标的数量:{len(positions)}" + logger.info(cash_report) + + for p in positions: + p_report = f"标的:{p.symbol},名称:{context.stocks.get(p.symbol, '无名')}," \ + f"数量:{p.volume},成本:{round(p.vwap, 2)},方向:{p.side}," \ + f"当前价:{round(p.price, 2)},成本市值:{int(p.volume * p.vwap)}," \ + f"建仓时间:{p.created_at.strftime(dt_fmt)}" + logger.info(p_report) + + # 实盘或仿真,推送账户信息到企业微信 + if context.mode != MODE_BACKTEST: + + msg = f"股票账户状态报告\n{'*' * 31}\n" + msg += f"账户净值:{int(cash.nav)}\n" \ + f"持仓市值:{int(cash.market_value)}\n" \ + f"可用资金:{int(cash.available)}\n" \ + f"浮动盈亏:{int(cash.fpnl)}\n" \ + f"标的数量:{len(positions)}\n" + wx.push_text(msg.strip("\n *"), key=context.wx_key) + + results = [] + for symbol, info in context.symbols_info.items(): + name = context.stocks.get(symbol, '无名') + trader: CzscTrader = context.symbols_info[symbol]['trader'] + p = account.position(symbol=symbol, side=PositionSide_Long) + + row = {'交易标的': symbol, '标的名称': name, + '最新时间': trader.end_dt.strftime(dt_fmt), + '最新价格': trader.latest_price} + + if "日线" in trader.kas.keys(): + bar1, bar2 = trader.kas['日线'].bars_raw[-2:] + row.update({'昨日收盘': round(bar1.close, 2), + '今日涨幅': round(bar2.close / bar1.close - 1, 4)}) + + if p: + row.update({"实盘持仓数量": p.volume, + "实盘持仓成本": round(p.vwap, 2), + "实盘持仓市值": int(p.volume * p.vwap)}) + else: + row.update({"实盘持仓数量": 0, "实盘持仓成本": 0, "实盘持仓市值": 0}) + + results.append(row) + + df = pd.DataFrame(results) + df.sort_values(['多头持仓', '多头收益'], ascending=False, inplace=True, ignore_index=True) + file_xlsx = os.path.join(context.data_path, f"holds_{context.now.strftime('%Y%m%d_%H%M')}.xlsx") + df.to_excel(file_xlsx, index=False) + wx.push_file(file_xlsx, key=context.wx_key) + os.remove(file_xlsx) + + # 提示非策略交易标的持仓 + process_out_of_symbols(context) + + +def sync_long_position(context, trader: CzscTrader): + """同步多头仓位到交易账户""" + if not trader.positions: + return + + symbol = trader.symbol + name = context.stocks.get(symbol, "无名标的") + long_pos = trader.get_ensemble_pos(method='vote') + max_sym_pos = context.symbols_info[symbol]['max_sym_pos'] # 最大标的仓位 + if context.mode == MODE_BACKTEST: + account = context.account() + else: + account = context.account(account_id=context.account_id) + cash = account.cash + + price = trader.latest_price + sym_position = account.position(symbol, PositionSide_Long) + if long_pos == 0 and not sym_position: + # 如果多头仓位为0且掘金账户没有对应持仓,直接退出 + return + + if long_pos == 0 and sym_position and sym_position.volume > 0: + # 如果多头仓位为0且掘金账户依然还有持仓,清掉仓位 + order_target_volume(symbol=symbol, volume=0, position_side=PositionSide_Long, + order_type=OrderType_Limit, price=price, account=account.id) + return + + if not trader.pos_changed: + return + + assert long_pos > 0 + if cash.available < price * 120: + logger.info(f"{context.now} {symbol} {name} 可用资金不足,无法开多仓,最少所需资金{int(price * 120)}元") + return + + if is_order_exist(context, symbol, PositionSide_Long): + logger.info(f"{context.now} {symbol} {name} 同方向订单已存在") + return + + percent = max_sym_pos * long_pos + volume = int((cash.nav * percent / price // 100) * 100) # 单位:股 + order_target_volume(symbol=symbol, volume=volume, position_side=PositionSide_Long, + order_type=OrderType_Limit, price=price, account=account.id) + + +def process_out_of_symbols(context): + """实盘:处理不在交易列表的持仓股""" + if context.now.isoweekday() > 5: + print(f"process_out_of_symbols: {context.now} 不是交易时间") + return + + if context.mode == MODE_BACKTEST: + print(f"process_out_of_symbols: 回测模式下不需要执行") + return + + account = context.account(account_id=context.account_id) + positions = account.positions(symbol="", side=PositionSide_Long) + + oos = [] + for p in positions: + symbol = p.symbol + if p.volume > 0 and p.symbol not in context.symbols_info.keys(): + oos.append(symbol) + # order_target_volume(symbol=symbol, volume=0, position_side=PositionSide_Long, + # order_type=OrderType_Limit, price=p.price, account=account.id) + if oos: + wx.push_text(f"不在交易列表的持仓股:{', '.join(oos)}", context.wx_key) + + +def init_context_universal(context, name): + """通用 context 初始化:1、创建文件目录和日志记录 + + :param context: + :param name: 交易策略名称,建议使用英文 + """ + path_gm_logs = os.environ.get('path_gm_logs', None) + if context.mode == MODE_BACKTEST: + data_path = os.path.join(path_gm_logs, f"backtest/{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}") + else: + data_path = os.path.join(path_gm_logs, f"realtime/{name}") + os.makedirs(data_path, exist_ok=True) + + context.name = name + context.data_path = data_path + context.stocks = get_symbol_names() + + logger.add(os.path.join(data_path, "gm_trader.log"), rotation="500MB", + encoding="utf-8", enqueue=True, retention="1 days") + logger.info("运行配置:") + logger.info(f"data_path = {data_path}") + + if context.mode == MODE_BACKTEST: + logger.info("backtest_start_time = " + str(context.backtest_start_time)) + logger.info("backtest_end_time = " + str(context.backtest_end_time)) + + +def init_context_env(context): + """通用 context 初始化:2、读入环境变量 + + :param context: + """ + context.wx_key = os.environ['wx_key'] + context.account_id = os.environ.get('account_id', '') + if context.mode != MODE_BACKTEST: + assert len(context.account_id) > 10, "非回测模式,必须设置 account_id " + + # 单个标的仓位控制[0, 1],按资金百分比控制,1表示满仓,仅在开仓的时候控制 + context.max_sym_pos = float(os.environ['max_sym_pos']) + assert 0 <= context.max_sym_pos <= 1 + + logger.info(f"环境变量读取结果如下:") + logger.info(f"单标的控制:context.max_sym_pos = {context.max_sym_pos}") + + +def init_context_traders(context, symbols: List[str], strategy): + """通用 context 初始化:3、为每个标的创建 trader + + :param context: + :param symbols: 交易标的列表 + :param strategy: 交易策略 + :return: + """ + assert issubclass(strategy, CzscStrategyBase), "strategy 必须是 CzscStrategyBase 的子类" + with open(os.path.join(context.data_path, f'{strategy.__name__}.txt'), mode='w') as f: + f.write(inspect.getsource(strategy)) + + tactic = strategy(symbol="000001") + base_freq, freqs = tactic.sorted_freqs[0], tactic.sorted_freqs[1:] + frequency = freq_cn2gm[base_freq] + unsubscribe(symbols='*', frequency=frequency) + + data_path = context.data_path + logger.info(f"输入交易标的数量:{len(symbols)}") + logger.info(f"交易员的周期列表:base_freq = {base_freq}; freqs = {freqs}") + + os.makedirs(os.path.join(data_path, 'traders'), exist_ok=True) + symbols_info = {symbol: dict() for symbol in symbols} + for symbol in symbols: + try: + symbols_info[symbol]['max_sym_pos'] = context.max_sym_pos + file_trader = os.path.join(data_path, f'traders/{symbol}.cat') + + if os.path.exists(file_trader) and context.mode != MODE_BACKTEST: + trader: CzscTrader = dill.load(open(file_trader, 'rb')) + logger.info(f"{symbol} Loaded Trader from {file_trader}") + + else: + tactic = strategy(symbol=symbol) + bg, data = get_init_bg(symbol, context.now, base_freq, freqs, 1000, ADJUST_PREV) + trader = CzscTrader(bg, get_signals=tactic.get_signals, positions=tactic.positions) + dill.dump(trader, open(file_trader, 'wb')) + + symbols_info[symbol]['trader'] = trader + logger.info("{} Trader 构建成功,最新时间:{},多仓:{}".format(symbol, trader.end_dt, trader.long_pos.pos)) + + except Exception as e1: + del symbols_info[symbol] + logger.exception(f"{e1}:{symbol} - {context.stocks.get(symbol, '无名')} 初始化失败,当前时间:{context.now}") + + subscribe(",".join(symbols_info.keys()), frequency=frequency, count=300, wait_group=False) + logger.info(f"订阅成功数量:{len(symbols_info)}") + logger.info(f"交易标的配置:{symbols_info}") + context.symbols_info = symbols_info + + +def init_context_schedule(context): + """通用 context 初始化:设置定时任务""" + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='09:31:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='10:01:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='10:31:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='11:01:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='11:31:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='13:01:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='13:31:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='14:01:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='14:31:00') + schedule(schedule_func=report_account_status, date_rule='1d', time_rule='15:01:00') + + # 以下是 实盘/仿真 模式下的定时任务 + if context.mode != MODE_BACKTEST: + schedule(schedule_func=save_traders, date_rule='1d', time_rule='11:40:00') + schedule(schedule_func=save_traders, date_rule='1d', time_rule='15:10:00') + # schedule(schedule_func=realtime_check_index_status, date_rule='1d', time_rule='17:30:00') + # schedule(schedule_func=process_out_of_symbols, date_rule='1d', time_rule='09:40:00') + + + + diff --git a/czsc/connectors/qmt_connector.py b/czsc/connectors/qmt_connector.py index 797f83e2e..bf690d429 100644 --- a/czsc/connectors/qmt_connector.py +++ b/czsc/connectors/qmt_connector.py @@ -5,6 +5,7 @@ create_dt: 2022/12/31 16:03 describe: QMT 量化交易平台接口 """ +import os import time import random import pandas as pd @@ -22,6 +23,8 @@ from xtquant.xttrader import XtQuantTrader, XtQuantTraderCallback from xtquant.xttype import StockAccount +dt_fmt = "%Y-%m-%d %H:%M:%S" + def format_stock_kline(kline: pd.DataFrame, freq: Freq) -> List[RawBar]: """QMT A股市场K线数据转换 @@ -164,6 +167,7 @@ def get_symbols(step): class TraderCallback(XtQuantTraderCallback): """基础回调类,主要是一些日志和IM通知功能""" + def __init__(self, **kwargs): super(TraderCallback, self).__init__() self.kwargs = kwargs @@ -175,15 +179,36 @@ def __init__(self, **kwargs): self.im = None self.members = None + # 推送模式:detail-详细模式,summary-汇总模式 + self.feishu_push_mode = kwargs.get('feishu_push_mode', 'detail') + file_log = kwargs.get('file_log', None) if file_log: logger.add(file_log, rotation='1 day', encoding='utf-8', enqueue=True) self.file_log = file_log logger.info(f"TraderCallback init: {kwargs}") + def push_message(self, msg: str, msg_type='text'): + """批量推送消息""" + if self.im and self.members: + for member in self.members: + try: + if msg_type == 'text': + self.im.send_text(msg, member) + elif msg_type == 'image': + self.im.send_image(msg, member) + elif msg_type == 'file': + self.im.send_file(msg, member) + else: + logger.error(f"不支持的消息类型:{msg_type}") + except Exception as e: + logger.error(f"推送消息失败:{e}") + def on_disconnected(self): """连接断开""" + logger.info("connection lost") + self.push_message("连接断开") def on_stock_order(self, order): """委托回报推送 @@ -192,6 +217,14 @@ def on_stock_order(self, order): """ logger.info(f"on order callback: {order.stock_code} {order.order_status} {order.order_sysid}") + if self.feishu_push_mode == 'detail': + msg = f"委托回报通知:\n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"标的:{order.stock_code}\n" \ + f"方向:{'做多' if order.order_type == 23 else '平多'}\n" \ + f"委托数量:{int(order.order_volume)}" + self.push_message(msg, msg_type='text') + def on_stock_asset(self, asset): """资金变动推送 @@ -199,6 +232,14 @@ def on_stock_asset(self, asset): """ logger.info(f"on asset callback: {asset.account_id} {asset.cash} {asset.total_asset}") + if self.feishu_push_mode == 'detail': + msg = f"资金变动通知: \n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"账户ID: {asset.account_id} \n" \ + f"可用资金:{asset.cash} \n" \ + f"总资产:{asset.total_asset}" + self.push_message(msg, msg_type='text') + def on_stock_trade(self, trade): """成交变动推送 @@ -206,6 +247,15 @@ def on_stock_trade(self, trade): """ logger.info(f"on trade callback: {trade.account_id} {trade.stock_code} {trade.order_id}") + if self.feishu_push_mode == 'detail': + msg = f"成交变动通知:\n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"标的:{trade.stock_code}\n" \ + f"方向:{'开多' if trade.order_type == 23 else '平多'}\n" \ + f"成交量:{int(trade.traded_volume)}\n" \ + f"成交价:{round(trade.traded_price, 2)}" + self.push_message(msg, msg_type='text') + def on_stock_position(self, position): """持仓变动推送 @@ -213,11 +263,24 @@ def on_stock_position(self, position): """ logger.info(f"on position callback: {position.stock_code} {position.volume}") + if self.feishu_push_mode == 'detail': + msg = f"持仓变动通知: \n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"标的:{position.stock_code}\n" \ + f"成交量:{position.volume}" + self.push_message(msg, msg_type='text') + def on_order_error(self, order_error): """委托失败推送 :param order_error:XtOrderError 对象 """ + msg = f"委托失败通知: \n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"订单编号:{order_error.order_id}\n" \ + f"错误编码:{order_error.error_id}\n" \ + f"失败原因:{order_error.error_msg}" + self.push_message(msg, msg_type='text') logger.info(f"on order_error callback: {order_error.order_id} {order_error.error_id} {order_error.error_msg}") def on_cancel_error(self, cancel_error): @@ -225,6 +288,12 @@ def on_cancel_error(self, cancel_error): :param cancel_error: XtCancelError 对象 """ + msg = f"撤单失败通知: \n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"订单编号:{cancel_error.order_id}\n" \ + f"错误编码:{cancel_error.error_id}\n" \ + f"失败原因:{cancel_error.error_msg}" + self.push_message(msg, msg_type='text') logger.info(f"{cancel_error.order_id} {cancel_error.error_id} {cancel_error.error_msg}") def on_order_stock_async_response(self, response): @@ -232,6 +301,12 @@ def on_order_stock_async_response(self, response): :param response: XtOrderResponse 对象 """ + msg = f"异步下单回报推送: \n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"资金账号:{response.account_id}\n" \ + f"订单编号:{response.order_id}\n" \ + f"策略名称:{response.strategy_name}" + self.push_message(msg, msg_type='text') logger.info(f"on_order_stock_async_response: {response.order_id} {response.seq}") def on_account_status(self, status): @@ -239,12 +314,24 @@ def on_account_status(self, status): :param status: XtAccountStatus 对象 """ - logger.info(f"on_account_status: {status.account_id} {status.account_type} {status.status}") + status_map = {xtconstant.ACCOUNT_STATUS_OK: '正常', + xtconstant.ACCOUNT_STATUS_WAITING_LOGIN: '连接中', + xtconstant.ACCOUNT_STATUSING: '登陆中', + xtconstant.ACCOUNT_STATUS_FAIL: '失败'} + msg = f"账户状态变化推送:\n{'*' * 31}\n" \ + f"时间:{datetime.now().strftime(dt_fmt)}\n" \ + f"账户ID:{status.account_id}\n" \ + f"账号类型:{'证券账户' if status.account_type == 2 else '其他'}\n" \ + f"账户状态:{status_map[status.status]}\n" + + logger.info(f"账户ID: {status.account_id} " + f"账号类型:{'证券账户' if status.account_type == 2 else '其他'} " + f"账户状态:{status_map[status.status]}") + self.push_message(msg, msg_type='text') class QmtTradeManager: - """QMT交易管理器""" - + """QMT交易管理器(这是一个案例性质的存在,真正实盘的时候请参考这个,根据自己的逻辑重新实现)""" def __init__(self, mini_qmt_dir, account_id, **kwargs): """ @@ -253,17 +340,19 @@ def __init__(self, mini_qmt_dir, account_id, **kwargs): :param kwargs: """ - self.symbols = kwargs.get('symbols', []) # 交易标的列表 + self.symbols = kwargs.get('symbols', []) # 交易标的列表 self.strategy = kwargs.get('strategy', []) # 交易策略 self.symbol_max_pos = kwargs.get('symbol_max_pos', 0.5) # 每个标的最大持仓比例 self.trade_sdt = kwargs.get('trade_sdt', '20220601') # 交易跟踪开始日期 self.mini_qmt_dir = mini_qmt_dir self.account_id = account_id self.base_freq = self.strategy(symbol='symbol').sorted_freqs[0] - self.delta_days = int(kwargs.get('delta_days', 1)) # 定时执行获取的K线天数 + self.delta_days = int(kwargs.get('delta_days', 1)) # 定时执行获取的K线天数 + self.forbidden_symbols = kwargs.get('forbidden_symbols', []) # 禁止交易的品种列表 self.session = random.randint(10000, 20000) - self.xtt = XtQuantTrader(mini_qmt_dir, session=self.session, callback=TraderCallback()) + self.callback = TraderCallback(**kwargs.get('callback_params', {})) + self.xtt = XtQuantTrader(mini_qmt_dir, session=self.session, callback=self.callback) self.acc = StockAccount(account_id, 'STOCK') self.xtt.start() self.xtt.connect() @@ -317,6 +406,9 @@ def is_allow_open(self, symbol, price): :param price: 股票现价 :return: True 允许开仓,False 不允许开仓 """ + if symbol in self.forbidden_symbols: + return False + # 如果 未成交的开仓委托单 存在,不允许开仓 if self.is_order_exist(symbol, order_type=23): logger.warning(f"存在未成交的开仓委托单,symbol={symbol}") @@ -338,6 +430,24 @@ def is_allow_open(self, symbol, price): return True + def is_allow_exit(self, symbol): + """判断是否允许平仓 + + :param symbol: 股票代码 + :return: True 允许开仓,False 不允许开仓 + """ + if symbol in self.forbidden_symbols: + return False + + pos = self.query_stock_positions().get(symbol) + if not pos: + return False + + if pos.can_use_volume <= 0: + return False + + return True + def query_stock_positions(self): """查询股票市场的持仓单 @@ -370,7 +480,7 @@ def send_stock_order(self, **kwargs): """ stock_code = kwargs.get('stock_code') order_type = kwargs.get('order_type') - order_volume = kwargs.get('order_volume') + order_volume = kwargs.get('order_volume') # 委托数量, 股票以'股'为单位, 债券以'张'为单位 price_type = kwargs.get('price_type', xtconstant.LATEST_PRICE) price = kwargs.get('price', 0) strategy_name = kwargs.get('strategy_name', "程序下单") @@ -384,7 +494,7 @@ def send_stock_order(self, **kwargs): order_volume = order_volume // 100 * 100 assert self.xtt.connected, "交易服务器连接断开" - _id = self.xtt.order_stock(self.acc, stock_code, order_type, order_volume, + _id = self.xtt.order_stock(self.acc, stock_code, order_type, int(order_volume), price_type, price, strategy_name, order_remark) return _id @@ -411,7 +521,7 @@ def update_traders(self): self.send_stock_order(stock_code=symbol, order_type=23, order_volume=order_volume) # 平多头 - if trader.get_ensemble_pos(method='vote') == 0 and symbol in holds.keys(): + if trader.get_ensemble_pos(method='vote') == 0 and self.is_allow_exit(symbol): order_volume = holds[symbol].can_use_volume self.send_stock_order(stock_code=symbol, order_type=24, order_volume=order_volume) @@ -420,14 +530,62 @@ def update_traders(self): else: logger.info(f"{symbol} 没有需要更新的K线,最新的K线时间是 {trader.end_dt}") - pos_info = {x.name: x.pos for x in trader.positions} - logger.info(f"{symbol} trader pos:{pos_info} | ensemble_pos: {trader.get_ensemble_pos('mean')}") + if trader.get_ensemble_pos('mean') > 0: + pos_info = {x.name: x.pos for x in trader.positions} + logger.info(f"{symbol} trader pos:{pos_info} | ensemble_pos: {trader.get_ensemble_pos('mean')}") except Exception as e: logger.error(f"{symbol} 更新交易策略失败,原因是 {e}") + def report(self): + """报告状态""" + from czsc.utils import WordWriter + + writer = WordWriter() + writer.add_title("QMT 交易报告") + assets = self.get_assets() + + writer.add_heading('一、账户状态', level=1) + writer.add_paragraph(f"交易品种数量:{len(self.traders)}\n" + f"传入品种数量:{len(self.symbols)}\n" + f"交易账户:{self.account_id}\n" + f"账户资产:{assets.total_asset}\n" + f"可用资金:{assets.cash}\n" + f"持仓市值:{assets.market_value}\n" + f"持仓情况:", + first_line_indent=0) + + sp = self.query_stock_positions() + if sp: + _res_sp = [] + for k, v in sp.items(): + _res_sp.append({'品种': k, '持仓股数': v.volume, '可用股数': v.can_use_volume, + '成本': v.open_price, '市值': v.market_value}) + writer.add_df_table(pd.DataFrame(_res_sp)) + else: + writer.add_paragraph("当前没有持仓", first_line_indent=0) + + writer.add_heading('二、策略状态', level=1) + + _res = [] + for symbol, trader in self.traders.items(): + if trader.get_ensemble_pos('mean') > 0: + _res.append({'symbol': symbol, 'pos': trader.get_ensemble_pos('mean'), + 'positions': {x.name: x.pos for x in trader.positions}}) + if _res: + writer.add_df_table(pd.DataFrame(_res)) + else: + writer.add_paragraph("当前所有品种都是空仓") + + file_docx = f"QMT_交易报告_{datetime.now().strftime('%Y%m%d_%H%M')}.docx" + writer.save(file_docx) + self.callback.push_message(file_docx, msg_type='file') + os.remove(file_docx) + def run(self, mode='30m'): """运行策略""" + self.report() + if mode.lower() == '15m': _times = ["09:45", "10:00", "10:15", "10:30", "10:45", "11:00", "11:15", "11:30", "13:15", "13:30", "13:45", "14:00", "14:15", "14:30", "14:45", "15:00"] @@ -441,6 +599,8 @@ def run(self, mode='30m'): while 1: if datetime.now().strftime("%H:%M") in _times: self.update_traders() + self.report() + time.sleep(60) else: time.sleep(3) @@ -450,6 +610,10 @@ def run(self, mode='30m'): self.xtt.start() +# ====================================================================================================================== +# 以下是测试代码 +# ====================================================================================================================== + def test_get_kline(): # 获取所有板块 slt = xtdata.get_sector_list() @@ -479,5 +643,3 @@ def test_get_symbols(): assert len(symbols) > 0 symbols = get_symbols('etfs') assert len(symbols) > 0 - - diff --git a/czsc/fsa/base.py b/czsc/fsa/base.py index d46791173..ef5f9c86a 100644 --- a/czsc/fsa/base.py +++ b/czsc/fsa/base.py @@ -20,7 +20,7 @@ from requests_toolbelt import MultipartEncoder -@retry(stop=stop_after_attempt(10), wait=wait_random(min=3, max=10)) +@retry(stop=stop_after_attempt(3), wait=wait_random(min=1, max=5)) def request(method, url, headers, payload=None) -> dict: """飞书API标准请求 diff --git a/czsc/gms/__init__.py b/czsc/gms/__init__.py deleted file mode 100644 index 2ab88c212..000000000 --- a/czsc/gms/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -# -*- coding: utf-8 -*- -""" -author: zengbin93 -email: zeng_bin8888@163.com -create_dt: 2022/8/8 22:29 -describe: 掘金量化终端对接 -""" diff --git a/czsc/gms/gm_base.py b/czsc/gms/gm_base.py deleted file mode 100644 index b687668ce..000000000 --- a/czsc/gms/gm_base.py +++ /dev/null @@ -1,466 +0,0 @@ -# -*- coding: utf-8 -*- -""" -author: zengbin93 -email: zeng_bin8888@163.com -create_dt: 2021/11/17 22:11 -describe: 配合 CzscAdvancedTrader 进行使用的掘金工具 -""" -import os -import dill -import czsc -import pandas as pd -from loguru import logger -try: - from gm.api import * -except: - logger.warning(f"gm 模块没有安装") -from datetime import datetime, timedelta -from collections import OrderedDict -from typing import List, Callable -from czsc import CzscAdvancedTrader, create_advanced_trader -from czsc.data import freq_cn2gm -from czsc.utils import qywx as wx -from czsc.utils import BarGenerator -from czsc.objects import RawBar, Freq - - -dt_fmt = "%Y-%m-%d %H:%M:%S" -date_fmt = "%Y-%m-%d" - -assert czsc.__version__ >= "0.8.29" - - -def set_gm_token(token): - with open(os.path.join(os.path.expanduser("~"), "gm_token.txt"), 'w', encoding='utf-8') as f: - f.write(token) - - -file_token = os.path.join(os.path.expanduser("~"), "gm_token.txt") -if not os.path.exists(file_token): - print("{} 文件不存在,请单独启动一个 python 终端,调用 set_gm_token 方法创建该文件,再重新执行。".format(file_token)) -else: - gm_token = open(file_token, encoding="utf-8").read() - set_token(gm_token) - - -def is_trade_date(dt): - """判断 dt 时刻是不是交易日期""" - dt = pd.to_datetime(dt) - date_ = dt.strftime("%Y-%m-%d") - trade_dates = get_trading_dates(exchange='SZSE', start_date=date_, end_date=date_) - if trade_dates: - return True - else: - return False - - -def is_trade_time(dt): - """判断 dt 时刻是不是交易时间""" - dt = pd.to_datetime(dt) - date_ = dt.strftime("%Y-%m-%d") - trade_dates = get_trading_dates(exchange='SZSE', start_date=date_, end_date=date_) - if trade_dates and "15:00" > dt.strftime("%H:%M") > "09:30": - return True - else: - return False - - -def get_symbol_names(): - """获取股票市场标的列表,包括股票、指数等""" - df = get_instruments(exchanges='SZSE,SHSE', fields="symbol,sec_name", df=True) - shares = {row['symbol']: row['sec_name'] for _, row in df.iterrows()} - return shares - - -def format_kline(df, freq: Freq): - bars = [] - for i, row in df.iterrows(): - # amount 单位:元 - bar = RawBar(symbol=row['symbol'], id=i, freq=freq, dt=row['eob'], open=round(row['open'], 2), - close=round(row['close'], 2), high=round(row['high'], 2), - low=round(row['low'], 2), vol=row['volume'], amount=row['amount']) - bars.append(bar) - return bars - - -def get_kline(symbol, end_time, freq='60s', count=33000, adjust=ADJUST_PREV): - """获取K线数据 - - :param symbol: 标的代码 - :param end_time: 结束时间 - :param freq: K线周期 - :param count: K线数量 - :param adjust: 复权方式 - :return: - """ - if isinstance(end_time, datetime): - end_time = end_time.strftime(dt_fmt) - - exchange = symbol.split(".")[0] - freq_map_ = {'60s': Freq.F1, '300s': Freq.F5, '900s': Freq.F15, '1800s': Freq.F30, - '3600s': Freq.F60, '1d': Freq.D} - - if exchange in ["SZSE", "SHSE"]: - df = history_n(symbol=symbol, frequency=freq, end_time=end_time, adjust=adjust, - fields='symbol,eob,open,close,high,low,volume,amount', count=count, df=True) - else: - df = history_n(symbol=symbol, frequency=freq, end_time=end_time, adjust=adjust, - fields='symbol,eob,open,close,high,low,volume,amount,position', count=count, df=True) - return format_kline(df, freq_map_[freq]) - - -def get_init_bg(symbol: str, - end_dt: [str, datetime], - base_freq: str, - freqs: List[str], - max_count=1000, - adjust=ADJUST_PREV): - """获取 symbol 的初始化 bar generator""" - if isinstance(end_dt, str): - end_dt = pd.to_datetime(end_dt, utc=True) - end_dt = end_dt.tz_convert('dateutil/PRC') - # 时区转换之后,要减去8个小时才是设置的时间 - end_dt = end_dt - timedelta(hours=8) - else: - assert end_dt.tzinfo._filename == 'PRC' - - delta_days = 180 - last_day = (end_dt - timedelta(days=delta_days)).replace(hour=16, minute=0) - - bg = BarGenerator(base_freq, freqs, max_count) - if "周线" in freqs or "月线" in freqs: - d_bars = get_kline(symbol, last_day, freq_cn2gm["日线"], count=5000, adjust=adjust) - bgd = BarGenerator("日线", ['周线', '月线', '季线', '年线']) - for b in d_bars: - bgd.update(b) - else: - bgd = None - - for freq in bg.bars.keys(): - if freq in ['周线', '月线', '季线', '年线']: - bars_ = bgd.bars[freq] - else: - bars_ = get_kline(symbol, last_day, freq_cn2gm[freq], max_count, adjust) - bg.init_freq_bars(freq, bars_) - print(f"{symbol} - {freq} - {len(bg.bars[freq])} - last_dt: {bg.bars[freq][-1].dt} - last_day: {last_day}") - - bars2 = get_kline(symbol, end_dt, freq_cn2gm[base_freq], - count=int(240 / int(base_freq.strip('分钟')) * delta_days)) - data = [x for x in bars2 if x.dt > last_day] - assert len(data) > 0 - print(f"{symbol}: bar generator 最新时间 {bg.bars[base_freq][-1].dt.strftime(dt_fmt)},还有{len(data)}行数据需要update") - return bg, data - - -order_side_map = {OrderSide_Unknown: '其他', OrderSide_Buy: '买入', OrderSide_Sell: '卖出'} -order_status_map = { - OrderStatus_Unknown: "其他", - OrderStatus_New: "已报", - OrderStatus_PartiallyFilled: "部成", - OrderStatus_Filled: "已成", - OrderStatus_Canceled: "已撤", - OrderStatus_PendingCancel: "待撤", - OrderStatus_Rejected: "已拒绝", - OrderStatus_Suspended: "挂起(无效)", - OrderStatus_PendingNew: "待报", - OrderStatus_Expired: "已过期", -} -pos_side_map = {PositionSide_Unknown: '其他', PositionSide_Long: '多头', PositionSide_Short: '空头'} -pos_effect_map = { - PositionEffect_Unknown: '其他', - PositionEffect_Open: '开仓', - PositionEffect_Close: '平仓', - PositionEffect_CloseToday: '平今仓', - PositionEffect_CloseYesterday: '平昨仓', -} -exec_type_map = { - ExecType_Unknown: "其他", - ExecType_New: "已报", - ExecType_Canceled: "已撤销", - ExecType_PendingCancel: "待撤销", - ExecType_Rejected: "已拒绝", - ExecType_Suspended: "挂起", - ExecType_PendingNew: "待报", - ExecType_Expired: "过期", - ExecType_Trade: "成交(有效)", - ExecType_OrderStatus: "委托状态", - ExecType_CancelRejected: "撤单被拒绝(有效)", -} - - -def on_order_status(context, order): - """ - https://www.myquant.cn/docs/python/python_object_trade#007ae8f5c7ec5298 - - :param context: - :param order: - :return: - """ - if not is_trade_time(context.now): - return - - symbol = order.symbol - latest_dt = context.now.strftime("%Y-%m-%d %H:%M:%S") - - if symbol not in context.symbols_info.keys(): - msg = f"订单状态更新通知:\n{'*' * 31}\n" \ - f"更新时间:{latest_dt}\n" \ - f"标的名称:{symbol} {context.stocks.get(symbol, '无名')}\n" \ - f"操作类型:{order_side_map[order.side]}{pos_effect_map[order.position_effect]}\n" \ - f"操作描述:非机器交易标的\n" \ - f"下单价格:{round(order.price, 2)}\n" \ - f"最新状态:{order_status_map[order.status]}\n" \ - f"委托(股):{int(order.volume)}\n" \ - f"已成(股):{int(order.filled_volume)}\n" \ - f"均价(元):{round(order.filled_vwap, 2)}" - - else: - trader: CzscAdvancedTrader = context.symbols_info[symbol]['trader'] - if trader.long_pos.operates: - last_op_desc = trader.long_pos.operates[-1]['op_desc'] - else: - last_op_desc = "" - - msg = f"订单状态更新通知:\n{'*' * 31}\n" \ - f"更新时间:{latest_dt}\n" \ - f"标的名称:{symbol} {context.stocks.get(symbol, '无名')}\n" \ - f"操作类型:{order_side_map[order.side]}{pos_effect_map[order.position_effect]}\n" \ - f"操作描述:{last_op_desc}\n" \ - f"下单价格:{round(order.price, 2)}\n" \ - f"最新状态:{order_status_map[order.status]}\n" \ - f"委托(股):{int(order.volume)}\n" \ - f"已成(股):{int(order.filled_volume)}\n" \ - f"均价(元):{round(order.filled_vwap, 2)}" - - logger.info(msg.replace("\n", " - ").replace('*', "")) - if context.mode != MODE_BACKTEST and order.status in [1, 3, 5, 8, 9, 12]: - wx.push_text(content=str(msg), key=context.wx_key) - - -def on_execution_report(context, execrpt): - """响应委托被执行事件,委托成交或者撤单拒绝后被触发。 - - https://www.myquant.cn/docs/python/python_trade_event#on_execution_report%20-%20%E5%A7%94%E6%89%98%E6%89%A7%E8%A1%8C%E5%9B%9E%E6%8A%A5%E4%BA%8B%E4%BB%B6 - https://www.myquant.cn/docs/python/python_object_trade#ExecRpt%20-%20%E5%9B%9E%E6%8A%A5%E5%AF%B9%E8%B1%A1 - - :param context: - :param execrpt: - :return: - """ - if not is_trade_time(context.now): - return - - latest_dt = context.now.strftime(dt_fmt) - msg = f"委托订单被执行通知:\n{'*' * 31}\n" \ - f"时间:{latest_dt}\n" \ - f"标的:{execrpt.symbol}\n" \ - f"名称:{context.stocks.get(execrpt.symbol, '无名')}\n" \ - f"方向:{order_side_map[execrpt.side]}{pos_effect_map[execrpt.position_effect]}\n" \ - f"成交量:{int(execrpt.volume)}\n" \ - f"成交价:{round(execrpt.price, 2)}\n" \ - f"执行回报类型:{exec_type_map[execrpt.exec_type]}" - - logger.info(msg.replace("\n", " - ").replace('*', "")) - if context.mode != MODE_BACKTEST and execrpt.exec_type in [1, 5, 6, 8, 12, 19]: - wx.push_text(content=str(msg), key=context.wx_key) - - -def on_backtest_finished(context, indicator): - """回测结束回调函数 - - :param context: - :param indicator: - https://www.myquant.cn/docs/python/python_object_trade#bd7f5adf22081af5 - :return: - """ - wx_key = context.wx_key - symbols = context.symbols - data_path = context.data_path - - logger.info(str(indicator)) - logger.info("回测结束 ... ") - cash = context.account().cash - - for k, v in indicator.items(): - if isinstance(v, float): - indicator[k] = round(v, 4) - - row = OrderedDict({ - "标的数量": len(context.symbols_info.keys()), - "开始时间": context.backtest_start_time, - "结束时间": context.backtest_end_time, - "累计收益": indicator['pnl_ratio'], - "最大回撤": indicator['max_drawdown'], - "年化收益": indicator['pnl_ratio_annual'], - "夏普比率": indicator['sharp_ratio'], - "盈利次数": indicator['win_count'], - "亏损次数": indicator['lose_count'], - "交易胜率": indicator['win_ratio'], - "累计出入金": int(cash['cum_inout']), - "累计交易额": int(cash['cum_trade']), - "累计手续费": int(cash['cum_commission']), - "累计平仓收益": int(cash['cum_pnl']), - "净收益": int(cash['pnl']), - }) - sdt = pd.to_datetime(context.backtest_start_time).strftime('%Y%m%d') - edt = pd.to_datetime(context.backtest_end_time).strftime('%Y%m%d') - file_xlsx = os.path.join(data_path, f'{context.name}_{sdt}_{edt}.xlsx') - file = pd.ExcelWriter(file_xlsx, mode='w') - - dfe = pd.DataFrame({"指标": list(row.keys()), "值": list(row.values())}) - dfe.to_excel(file, sheet_name='回测表现', index=False) - - logger.info("回测结果:{}".format(row)) - content = "" - for k, v in row.items(): - content += "{}: {}\n".format(k, v) - wx.push_text(content=content, key=wx_key) - - trades = [] - operates = [] - performances = [] - for symbol in symbols: - trader: CzscAdvancedTrader = context.symbols_info[symbol]['trader'] - trades.extend(trader.long_pos.pairs) - operates.extend(trader.long_pos.operates) - performances.append(trader.long_pos.evaluate_operates()) - - df = pd.DataFrame(trades) - df['开仓时间'] = df['开仓时间'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M")) - df['平仓时间'] = df['平仓时间'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M")) - df.to_excel(file, sheet_name='交易汇总', index=False) - - dfo = pd.DataFrame(operates) - dfo['dt'] = dfo['dt'].apply(lambda x: x.strftime("%Y-%m-%d %H:%M")) - dfo.to_excel(file, sheet_name='操作汇总', index=False) - - dfp = pd.DataFrame(performances) - dfp.to_excel(file, sheet_name='表现汇总', index=False) - file.close() - - wx.push_file(file_xlsx, wx_key) - - -def on_error(context, code, info): - if not is_trade_time(context.now): - return - - msg = "{} - {}".format(code, info) - logger.warning(msg) - if context.mode != MODE_BACKTEST: - wx.push_text(content=msg, key=context.wx_key) - - -def on_account_status(context, account): - """响应交易账户状态更新事件,交易账户状态变化时被触发 - https://www.myquant.cn/docs/python/python_trade_event#4f07d24fc4314e3c - """ - status = account['status'] - if status['state'] == 3: - return - - if not is_trade_time(context.now): - return - - msg = f"{str(account)}" - logger.warning(msg) - if context.mode != MODE_BACKTEST: - wx.push_text(content=msg, key=context.wx_key) - - -def is_order_exist(context, symbol, side) -> bool: - """判断同方向订单是否已经存在 - - :param context: - :param symbol: 交易标的 - :param side: 交易方向 - :return: bool - """ - uo = context.unfinished_orders - if not uo: - return False - else: - for o in uo: - if o.symbol == symbol and o.side == side: - context.logger.info("同类型订单已存在:{} - {}".format(symbol, side)) - return True - return False - - -def cancel_timeout_orders(context, max_m=30): - """实盘仿真,撤销挂单时间超过 max_m 分钟的订单。 - - :param context: - :param max_m: 最大允许挂单分钟数 - :return: - """ - for u_order in context.unfinished_orders: - if context.now - u_order.created_at >= timedelta(minutes=max_m): - order_cancel(u_order) - - -def gm_take_snapshot(gm_symbol, end_dt=None, file_html=None, - freqs=('1分钟', '5分钟', '15分钟', '30分钟', '60分钟', '日线', '周线', '月线'), - adjust=ADJUST_PREV, max_count=1000): - """使用掘金的数据对任意标的、任意时刻的状态进行快照 - - :param gm_symbol: - :param end_dt: - :param file_html: - :param freqs: - :param adjust: - :param max_count: - :return: - """ - if not end_dt: - end_dt = datetime.now().strftime(dt_fmt) - - bg, data = get_init_bg(gm_symbol, end_dt, freqs[0], freqs[1:], max_count, adjust) - ct = CzscAdvancedTrader(bg) - for bar in data: - ct.update(bar) - - if file_html: - ct.take_snapshot(file_html) - print(f'saved into {file_html}') - else: - ct.open_in_browser() - return ct - - -def strategy_snapshot(symbol, strategy: Callable, end_dt=None, file_html=None, adjust=ADJUST_PREV, max_count=1000): - """使用掘金的数据对任意标的、任意时刻的状态进行策略快照 - - :param symbol: 交易标的 - :param strategy: 择时交易策略 - :param end_dt: 结束时间,精确到分钟 - :param file_html: 结果文件 - :param adjust: 复权类型 - :param max_count: 最大K线数量 - :return: trader - """ - tactic = strategy(symbol) - base_freq = tactic['base_freq'] - freqs = tactic['freqs'] - bg, data = get_init_bg(symbol, end_dt, base_freq, freqs, max_count, adjust) - trader = create_advanced_trader(bg, data, strategy) - if file_html: - trader.take_snapshot(file_html) - print(f'saved into {file_html}') - else: - trader.open_in_browser() - return trader - - -def save_traders(context): - """实盘:保存交易员快照""" - if context.now.isoweekday() > 5: - print(f"save_traders: {context.now} 不是交易时间") - return - - for symbol in context.symbols_info.keys(): - trader: CzscAdvancedTrader = context.symbols_info[symbol]['trader'] - if context.mode != MODE_BACKTEST: - file_trader = os.path.join(context.data_path, f'traders/{symbol}.cat') - dill.dump(trader, open(file_trader, 'wb')) - diff --git a/czsc/gms/gm_stocks.py b/czsc/gms/gm_stocks.py deleted file mode 100644 index 9245b6b4f..000000000 --- a/czsc/gms/gm_stocks.py +++ /dev/null @@ -1,400 +0,0 @@ -# -*- coding: utf-8 -*- -""" -author: zengbin93 -email: zeng_bin8888@163.com -create_dt: 2021/11/17 22:11 -describe: 配合 CzscAdvancedTrader 进行使用的掘金工具 -""" -import inspect -import traceback -from czsc.gms.gm_base import * -# from czsc.utils import create_logger -from czsc.objects import PositionLong, Operate - - -indices = { - "上证指数": 'SHSE.000001', - "上证50": 'SHSE.000016', - "沪深300": "SHSE.000300", - "中证1000": "SHSE.000852", - "中证500": "SHSE.000905", - - "深证成指": "SZSE.399001", - "创业板指数": 'SZSE.399006', - "深次新股": "SZSE.399678", - "中小板指": "SZSE.399005", - "国证2000": "SZSE.399303", - "小盘成长": "SZSE.399376", - "小盘价值": "SZSE.399377", -} - - -def get_index_shares(name, end_date=None): - """获取某一交易日的指数成分股列表 - - symbols = get_index_shares("上证50", "2019-01-01 09:30:00") - """ - if not end_date: - end_date = datetime.now().strftime(date_fmt) - else: - end_date = pd.to_datetime(end_date).strftime(date_fmt) - constituents = get_history_constituents(indices[name], end_date, end_date)[0] - symbol_list = [k for k, v in constituents['constituents'].items()] - return list(set(symbol_list)) - - -def on_bar(context, bars): - """订阅K线回调函数""" - context.unfinished_orders = get_unfinished_orders() - cancel_timeout_orders(context, max_m=30) - - for bar in bars: - symbol = bar['symbol'] - trader: CzscAdvancedTrader = context.symbols_info[symbol]['trader'] - - # 确保数据更新到最新时刻 - base_freq = trader.base_freq - bars = context.data(symbol=symbol, frequency=freq_cn2gm[base_freq], count=100, - fields='symbol,eob,open,close,high,low,volume,amount') - bars = format_kline(bars, freq=trader.bg.freq_map[base_freq]) - bars_new = [x for x in bars if x.dt > trader.bg.bars[base_freq][-1].dt] - if bars_new: - for bar_ in bars_new: - trader.update(bar_) - - sync_long_position(context, trader) - - -def report_account_status(context): - """报告账户持仓状态""" - if context.now.isoweekday() > 5: - return - - latest_dt = context.now.strftime(dt_fmt) - account = context.account(account_id=context.account_id) - cash = account.cash - positions = account.positions() - - logger.info("=" * 30 + f" 账户状态【{latest_dt}】 " + "=" * 30) - cash_report = f"净值:{int(cash.nav)},可用资金:{int(cash.available)}," \ - f"浮动盈亏:{int(cash.fpnl)},标的数量:{len(positions)}" - logger.info(cash_report) - - for p in positions: - p_report = f"标的:{p.symbol},名称:{context.stocks.get(p.symbol, '无名')}," \ - f"数量:{p.volume},成本:{round(p.vwap, 2)},方向:{p.side}," \ - f"当前价:{round(p.price, 2)},成本市值:{int(p.volume * p.vwap)}," \ - f"建仓时间:{p.created_at.strftime(dt_fmt)}" - logger.info(p_report) - - # 实盘或仿真,推送账户信息到企业微信 - if context.mode != MODE_BACKTEST: - - msg = f"股票账户状态报告\n{'*' * 31}\n" - msg += f"账户净值:{int(cash.nav)}\n" \ - f"持仓市值:{int(cash.market_value)}\n" \ - f"可用资金:{int(cash.available)}\n" \ - f"浮动盈亏:{int(cash.fpnl)}\n" \ - f"标的数量:{len(positions)}\n" - wx.push_text(msg.strip("\n *"), key=context.wx_key) - - results = [] - for symbol, info in context.symbols_info.items(): - name = context.stocks.get(symbol, '无名') - trader: CzscAdvancedTrader = context.symbols_info[symbol]['trader'] - p = account.position(symbol=symbol, side=PositionSide_Long) - - row = {'交易标的': symbol, '标的名称': name, - '最新时间': trader.end_dt.strftime(dt_fmt), - '最新价格': trader.latest_price} - - if "日线" in trader.kas.keys(): - bar1, bar2 = trader.kas['日线'].bars_raw[-2:] - row.update({'昨日收盘': round(bar1.close, 2), - '今日涨幅': round(bar2.close / bar1.close - 1, 4)}) - - if trader.long_pos.pos > 0: - row.update({'多头持仓': trader.long_pos.pos, - '多头成本': trader.long_pos.long_cost, - '多头收益': round(trader.latest_price / trader.long_pos.long_cost - 1, 4), - '开多时间': trader.long_pos.operates[-1]['dt'].strftime(dt_fmt)}) - else: - row.update({'多头持仓': 0, '多头成本': 0, '多头收益': 0, '开多时间': None}) - - if p: - row.update({"实盘持仓数量": p.volume, - "实盘持仓成本": round(p.vwap, 2), - "实盘持仓市值": int(p.volume * p.vwap)}) - else: - row.update({"实盘持仓数量": 0, "实盘持仓成本": 0, "实盘持仓市值": 0}) - - results.append(row) - - df = pd.DataFrame(results) - df.sort_values(['多头持仓', '多头收益'], ascending=False, inplace=True, ignore_index=True) - file_xlsx = os.path.join(context.data_path, f"holds_{context.now.strftime('%Y%m%d_%H%M')}.xlsx") - df.to_excel(file_xlsx, index=False) - wx.push_file(file_xlsx, key=context.wx_key) - os.remove(file_xlsx) - - # 提示非策略交易标的持仓 - process_out_of_symbols(context) - - -def sync_long_position(context, trader: CzscAdvancedTrader): - """同步多头仓位到交易账户""" - if not trader.long_events: - return - - symbol = trader.symbol - name = context.stocks.get(symbol, "无名标的") - long_pos: PositionLong = trader.long_pos - max_sym_pos = context.symbols_info[symbol]['max_sym_pos'] # 最大标的仓位 - if context.mode == MODE_BACKTEST: - account = context.account() - else: - account = context.account(account_id=context.account_id) - cash = account.cash - - price = trader.latest_price - print(f"{trader.end_dt}: {name},多头:{long_pos.pos},成本:{long_pos.long_cost}," - f"现价:{price},操作次数:{len(long_pos.operates)}") - - algo_name = os.environ.get('algo_name', None) - if algo_name: - # 算法名称,TWAP、VWAP、ATS-SMART、ZC-POV - algo_name = algo_name.upper() - start_time = trader.end_dt.strftime("%H:%M:%S") - end_time = (trader.end_dt + timedelta(minutes=30)).strftime("%H:%M:%S") - end_time = min(end_time, '14:55:00') - - if algo_name == 'TWAP' or algo_name == 'VWAP' or algo_name == 'ZC-POV': - algo_param = { - "start_time": start_time, - "end_time": end_time, - "part_rate": 0.5, - "min_amount": 5000, - } - elif algo_name == 'ATS-SMART': - algo_param = { - 'start_time': start_time, - 'end_time_referred': end_time, - 'end_time': end_time, - 'end_time_valid': 1, - 'stop_sell_when_dl': 1, - 'cancel_when_pl': 0, - 'min_trade_amount': 5000 - } - else: - raise ValueError("算法单名称输入错误") - else: - algo_param = {} - - sym_position = account.position(symbol, PositionSide_Long) - if long_pos.pos == 0 and not sym_position: - # 如果多头仓位为0且掘金账户没有对应持仓,直接退出 - return - - if long_pos.pos == 0 and sym_position and sym_position.volume > 0: - # 如果多头仓位为0且掘金账户依然还有持仓,清掉仓位 - volume = sym_position.volume - if algo_name: - assert len(algo_param) > 0, f"error: {algo_name}, {algo_param}" - _ = algo_order(symbol=symbol, volume=volume, side=OrderSide_Sell, - order_type=OrderType_Limit, position_effect=PositionEffect_Close, - price=price, algo_name=algo_name, algo_param=algo_param, account=account.id) - else: - order_target_volume(symbol=symbol, volume=0, position_side=PositionSide_Long, - order_type=OrderType_Limit, price=price, account=account.id) - return - - if not long_pos.pos_changed: - return - - assert long_pos.pos > 0 - cash_left = cash.available - if long_pos.operates[-1]['op'] in [Operate.LO, Operate.LA1, Operate.LA2]: - change_amount = max_sym_pos * long_pos.operates[-1]['pos_change'] * cash.nav - if cash_left < change_amount: - logger.info(f"{context.now} {symbol} {name} 可用资金不足,无法开多仓;" - f"剩余资金{int(cash_left)}元,所需资金{int(change_amount)}元") - return - - if is_order_exist(context, symbol, PositionSide_Long): - logger.info(f"{context.now} {symbol} {name} 同方向订单已存在") - return - - percent = max_sym_pos * long_pos.pos - volume = int((cash.nav * percent / price // 100) * 100) # 单位:股 - if algo_name: - _ = algo_order(symbol=symbol, volume=volume, side=OrderSide_Buy, - order_type=OrderType_Limit, position_effect=PositionEffect_Open, - price=price, algo_name=algo_name, algo_param=algo_param, account=account.id) - else: - order_target_volume(symbol=symbol, volume=volume, position_side=PositionSide_Long, - order_type=OrderType_Limit, price=price, account=account.id) - - -def check_index_status(qywx_key): - """查看主要指数状态""" - from czsc.utils.cache import home_path - - wx.push_text(f"{datetime.now()} 开始获取主要指数行情快照", qywx_key) - for gm_symbol in indices.values(): - try: - file_html = os.path.join(home_path, f"{gm_symbol}_{datetime.now().strftime('%Y%m%d')}.html") - gm_take_snapshot(gm_symbol, file_html=file_html) - wx.push_file(file_html, qywx_key) - os.remove(file_html) - except: - traceback.print_exc() - wx.push_text(f"{datetime.now()} 获取主要指数行情快照获取结束,请仔细观察!!!", qywx_key) - - -def realtime_check_index_status(context): - """实盘:发送主要指数行情图表""" - if context.now.isoweekday() > 5: - print(f"realtime_check_index_status: {context.now} 不是交易时间") - return - - check_index_status(context.wx_key) - - -def process_out_of_symbols(context): - """实盘:处理不在交易列表的持仓股""" - if context.now.isoweekday() > 5: - print(f"process_out_of_symbols: {context.now} 不是交易时间") - return - - if context.mode == MODE_BACKTEST: - print(f"process_out_of_symbols: 回测模式下不需要执行") - return - - account = context.account(account_id=context.account_id) - positions = account.positions(symbol="", side=PositionSide_Long) - - oos = [] - for p in positions: - symbol = p.symbol - if p.volume > 0 and p.symbol not in context.symbols_info.keys(): - oos.append(symbol) - # order_target_volume(symbol=symbol, volume=0, position_side=PositionSide_Long, - # order_type=OrderType_Limit, price=p.price, account=account.id) - if oos: - wx.push_text(f"不在交易列表的持仓股:{', '.join(oos)}", context.wx_key) - - -def init_context_universal(context, name): - """通用 context 初始化:1、创建文件目录和日志记录 - - :param context: - :param name: 交易策略名称,建议使用英文 - """ - path_gm_logs = os.environ.get('path_gm_logs', None) - if context.mode == MODE_BACKTEST: - data_path = os.path.join(path_gm_logs, f"backtest/{name}_{datetime.now().strftime('%Y%m%d_%H%M%S')}") - else: - data_path = os.path.join(path_gm_logs, f"realtime/{name}") - os.makedirs(data_path, exist_ok=True) - - context.name = name - context.data_path = data_path - context.stocks = get_symbol_names() - - logger.add(os.path.join(data_path, "gm_trader.log"), rotation="500MB", - encoding="utf-8", enqueue=True, retention="1 days") - logger.info("运行配置:") - logger.info(f"data_path = {data_path}") - - if context.mode == MODE_BACKTEST: - logger.info("backtest_start_time = " + str(context.backtest_start_time)) - logger.info("backtest_end_time = " + str(context.backtest_end_time)) - - -def init_context_env(context): - """通用 context 初始化:2、读入环境变量 - - :param context: - """ - context.wx_key = os.environ['wx_key'] - context.account_id = os.environ.get('account_id', '') - if context.mode != MODE_BACKTEST: - assert len(context.account_id) > 10, "非回测模式,必须设置 account_id " - - # 单个标的仓位控制[0, 1],按资金百分比控制,1表示满仓,仅在开仓的时候控制 - context.max_sym_pos = float(os.environ['max_sym_pos']) - assert 0 <= context.max_sym_pos <= 1 - - logger.info(f"环境变量读取结果如下:") - logger.info(f"单标的控制:context.max_sym_pos = {context.max_sym_pos}") - - -def init_context_traders(context, symbols: List[str], strategy: Callable): - """通用 context 初始化:3、为每个标的创建 trader - - :param context: - :param symbols: 交易标的列表 - :param strategy: 交易策略 - :return: - """ - with open(os.path.join(context.data_path, f'{strategy.__name__}.txt'), mode='w') as f: - f.write(inspect.getsource(strategy)) - - tactic = strategy("000001") - base_freq, freqs = tactic['base_freq'], tactic['freqs'] - frequency = freq_cn2gm[base_freq] - unsubscribe(symbols='*', frequency=frequency) - - data_path = context.data_path - logger.info(f"输入交易标的数量:{len(symbols)}") - logger.info(f"交易员的周期列表:base_freq = {base_freq}; freqs = {freqs}") - - os.makedirs(os.path.join(data_path, 'traders'), exist_ok=True) - symbols_info = {symbol: dict() for symbol in symbols} - for symbol in symbols: - try: - symbols_info[symbol]['max_sym_pos'] = context.max_sym_pos - file_trader = os.path.join(data_path, f'traders/{symbol}.cat') - - if os.path.exists(file_trader) and context.mode != MODE_BACKTEST: - trader: CzscAdvancedTrader = dill.load(open(file_trader, 'rb')) - logger.info(f"{symbol} Loaded Trader from {file_trader}") - - else: - bg, data = get_init_bg(symbol, context.now, base_freq, freqs, 1000, ADJUST_PREV) - trader = create_advanced_trader(bg, data, strategy) - dill.dump(trader, open(file_trader, 'wb')) - - symbols_info[symbol]['trader'] = trader - logger.info("{} Trader 构建成功,最新时间:{},多仓:{}".format(symbol, trader.end_dt, trader.long_pos.pos)) - - except: - del symbols_info[symbol] - logger.info(f"{symbol} - {context.stocks.get(symbol, '无名')} 初始化失败,当前时间:{context.now}") - traceback.print_exc() - - subscribe(",".join(symbols_info.keys()), frequency=frequency, count=300, wait_group=False) - logger.info(f"订阅成功数量:{len(symbols_info)}") - logger.info(f"交易标的配置:{symbols_info}") - context.symbols_info = symbols_info - - -def init_context_schedule(context): - """通用 context 初始化:设置定时任务""" - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='09:31:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='10:01:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='10:31:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='11:01:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='11:31:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='13:01:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='13:31:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='14:01:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='14:31:00') - schedule(schedule_func=report_account_status, date_rule='1d', time_rule='15:01:00') - - # 以下是 实盘/仿真 模式下的定时任务 - if context.mode != MODE_BACKTEST: - schedule(schedule_func=save_traders, date_rule='1d', time_rule='11:40:00') - schedule(schedule_func=save_traders, date_rule='1d', time_rule='15:10:00') - # schedule(schedule_func=realtime_check_index_status, date_rule='1d', time_rule='17:30:00') - # schedule(schedule_func=process_out_of_symbols, date_rule='1d', time_rule='09:40:00') diff --git a/czsc/objects.py b/czsc/objects.py index 5fda84868..d7ecbf877 100644 --- a/czsc/objects.py +++ b/czsc/objects.py @@ -6,6 +6,9 @@ describe: 常用对象结构 """ import math +import pandas as pd +import numpy as np +from copy import deepcopy from dataclasses import dataclass from datetime import datetime from loguru import logger @@ -627,15 +630,16 @@ def __init__(self, symbol: str, opens: List[Event], exits: List[Event] = None, i self.stop_loss = stop_loss self.T0 = T0 + self.pos_changed = False # 仓位是否发生变化 self.operates = [] # 事件触发的操作列表 - self.holds = [] # 持仓状态列表 + self.holds = [] # 持仓状态列表 self.pos = 0 # 辅助判断的缓存数据 self.last_event = {'dt': None, 'bid': None, 'price': None, "op": None, 'op_desc': None} - self.last_lo_dt = None # 最近一次开多交易的时间 - self.last_so_dt = None # 最近一次开空交易的时间 - self.end_dt = None # 最近一次信号传入的时间 + self.last_lo_dt = None # 最近一次开多交易的时间 + self.last_so_dt = None # 最近一次开空交易的时间 + self.end_dt = None # 最近一次信号传入的时间 def __repr__(self): return f"Position(name={self.name}, symbol={self.symbol}, opens={[x.name for x in self.opens]}, " \ @@ -654,28 +658,9 @@ def dump(self, with_data=False): "T0": self.T0, } if with_data: - raw.update({"pairs": self.pairs, "holds": self.holds}) + raw.update({"pairs": self.pairs, "holds": self.holds}) return raw - def __two_operates_pair(self, op1, op2): - assert op1['op'] in [Operate.LO, Operate.SO] - pair = { - '标的代码': self.symbol, - '策略标记': self.name, - '交易方向': "多头" if op1['op'] == Operate.LO else "空头", - '开仓时间': op1['dt'], - '平仓时间': op2['dt'], - '开仓价格': op1['price'], - '平仓价格': op2['price'], - '持仓K线数': op2['bid'] - op1['bid'], - '事件序列': f"{op1['op_desc']} -> {op2['op_desc']}", - '持仓天数': (op2['dt'] - op1['dt']).total_seconds() / (24 * 3600), - '盈亏比例': op2['price'] / op1['price'] - 1 if op1['op'] == Operate.LO else 1 - op2['price'] / op1['price'], - } - # 盈亏比例 转换成以 BP 为单位的收益,1BP = 0.0001 - pair['盈亏比例'] = round(pair['盈亏比例'] * 10000, 2) - return pair - @property def pairs(self): """开平交易列表 @@ -710,9 +695,27 @@ def pairs(self): 3. 持仓K线数,指基础周期K线数量 """ pairs = [] + for op1, op2 in zip(self.operates, self.operates[1:]): - if op1['op'] in [Operate.LO, Operate.SO]: - pairs.append(self.__two_operates_pair(op1, op2)) + if op1['op'] not in [Operate.LO, Operate.SO]: + continue + + ykr = op2['price'] / op1['price'] - 1 if op1['op'] == Operate.LO else 1 - op2['price'] / op1['price'] + pair = { + '标的代码': self.symbol, + '策略标记': self.name, + '交易方向': "多头" if op1['op'] == Operate.LO else "空头", + '开仓时间': op1['dt'], + '平仓时间': op2['dt'], + '开仓价格': op1['price'], + '平仓价格': op2['price'], + '持仓K线数': op2['bid'] - op1['bid'], + '事件序列': f"{op1['op_desc']} -> {op2['op_desc']}", + '持仓天数': (op2['dt'] - op1['dt']).total_seconds() / (24 * 3600), + '盈亏比例': round(ykr * 10000, 2), # 盈亏比例 转换成以 BP 为单位的收益,1BP = 0.0001 + } + pairs.append(pair) + return pairs def evaluate_pairs(self, trade_dir: str = "多空") -> dict: @@ -758,6 +761,63 @@ def evaluate_pairs(self, trade_dir: str = "多空") -> dict: return p + def evaluate_holds(self, trade_dir: str = "多空") -> dict: + """按持仓信号评估交易表现 + + :param trade_dir: 交易方向,可选值 ['多头', '空头', '多空'] + :return: 交易表现 + """ + holds = deepcopy(self.holds) + if trade_dir != '多空': + _OD = 1 if trade_dir == "多头" else -1 + for hold in holds: + if hold['pos'] != 0 and hold['pos'] != _OD: + hold['pos'] = 0 + + p = {"交易标的": self.symbol, "策略标记": self.name, "交易方向": trade_dir, + "开始时间": "", "结束时间": "", + '覆盖率': 0, '夏普': 0, '卡玛': 0, '最大回撤': 0, '年化收益': 0, '日胜率': 0} + + if len(holds) == 0 or all(x['pos'] == 0 for x in holds): + return p + + dfh = pd.DataFrame(holds) + dfh['n1b'] = (dfh['price'].shift(1) - dfh['price']) / dfh['price'] + dfh['trade_date'] = dfh['dt'].apply(lambda x: x.strftime('%Y-%m-%d')) + dfh['edge'] = dfh['n1b'] * dfh['pos'] # 持有下一根K线的边际收益 + + # 按日期聚合 + dfv = dfh.groupby('trade_date')['edge'].sum() + dfv = dfv.cumsum() + + yearly_n = 252 + yearly_ret = dfv.iloc[-1] * (yearly_n / len(dfv)) + sharp = dfv.diff().mean() / dfv.diff().std() * pow(yearly_n, 0.5) if dfv.diff().std() != 0 else 0 + df0 = dfv.shift(1).ffill().fillna(0) + mdd = (1 - (df0 + 1) / (df0 + 1).cummax()).max() + calmar = yearly_ret / mdd if mdd != 0 else 1 + + p.update({ + "开始时间": dfh['dt'].iloc[0].strftime('%Y-%m-%d'), + "结束时间": dfh['dt'].iloc[-1].strftime('%Y-%m-%d'), + '覆盖率': round(len(dfh[dfh['pos'] != 0]) / len(dfh), 4), + '夏普': round(sharp, 4), + '卡玛': round(calmar, 4), + '最大回撤': round(mdd, 4), + '年化收益': round(yearly_ret, 4), + '日胜率': round(sum(dfv > 0) / len(dfv), 4)}) + return p + + def evaluate(self, trade_dir: str = "多空") -> dict: + """评估交易表现 + + :param trade_dir: 交易方向,可选值 ['多头', '空头', '多空'] + :return: 交易表现 + """ + p = self.evaluate_pairs(trade_dir) + p.update(self.evaluate_holds(trade_dir)) + return p + def update(self, s: dict): """更新持仓状态 @@ -768,6 +828,7 @@ def update(self, s: dict): logger.warning(f"请检查信号传入:最新信号时间{s['dt']}在上次信号时间{self.end_dt}之前") return + self.pos_changed = False op = Operate.HO op_desc = "" for event in self.events: @@ -785,6 +846,7 @@ def update(self, s: dict): self.last_event = {'dt': dt, 'bid': bid, 'price': price, 'op': op, 'op_desc': op_desc} def __create_operate(_op, _op_desc): + self.pos_changed = True return {'symbol': self.symbol, 'dt': dt, 'bid': bid, 'price': price, 'op': _op, 'op_desc': _op_desc, 'pos': self.pos} @@ -851,5 +913,4 @@ def __create_operate(_op, _op_desc): self.pos = 0 self.operates.append(__create_operate(Operate.SE, f"平空@{self.timeout}K超时")) - self.holds.append({"dt": self.end_dt, 'pos': self.pos}) - + self.holds.append({"dt": self.end_dt, 'pos': self.pos, 'price': price, 'bid': bid}) diff --git a/czsc/signals/__init__.py b/czsc/signals/__init__.py index 28a9184a2..cf746f4ec 100644 --- a/czsc/signals/__init__.py +++ b/czsc/signals/__init__.py @@ -21,6 +21,8 @@ cxt_bi_break_V221126, cxt_sub_b3_V221212, cxt_zhong_shu_gong_zhen_V221221, + cxt_bi_end_V230222, + cxt_bi_end_V230224, ) @@ -54,6 +56,7 @@ bar_single_V230214, bar_amount_acc_V230214, bar_big_solid_V230215, + bar_vol_bs1_V230224, ) from czsc.signals.jcc import ( diff --git a/czsc/signals/bar.py b/czsc/signals/bar.py index 31b24f630..15fde1752 100644 --- a/czsc/signals/bar.py +++ b/czsc/signals/bar.py @@ -672,4 +672,39 @@ def bar_big_solid_V230215(c: CZSC, di: int = 1, n: int = 20, **kwargs): return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) +def bar_vol_bs1_V230224(c: CZSC, di: int = 1, n: int = 20, **kwargs): + """量价配合的高低点判断 + **信号逻辑:** + + 1. 高点看空:窗口内最近一根K线上影大于下影的两倍,同时最高价和成交量同时创新高 + 2. 反之,低点看多 + + **信号列表:** + + - Signal('15分钟_D2N34量价_BS1辅助_看多_任意_任意_0') + - Signal('15分钟_D2N34量价_BS1辅助_看空_任意_任意_0') + + :param c: CZSC 对象 + :param di: 倒数第i根K线 + :param n: 窗口大小 + :return: 信号字典 + """ + k1, k2, k3 = f"{c.freq.value}_D{di}N{n}量价_BS1辅助".split('_') + _bars = get_sub_elements(c.bars_raw, di=di, n=n) + mean_vol = np.mean([x.amount for x in _bars]) + + short_c1 = _bars[-1].high == max([x.high for x in _bars]) and _bars[-1].upper > 2 * _bars[-1].lower > 0 + short_c2 = _bars[-1].amount > mean_vol * 3 + + long_c1 = _bars[-1].low == min([x.low for x in _bars]) and _bars[-1].lower > 2 * _bars[-1].upper > 0 + long_c2 = _bars[-1].amount < mean_vol * 0.7 + + if short_c1 and short_c2: + v1 = '看空' + elif long_c1 and long_c2: + v1 = '看多' + else: + v1 = '其他' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) \ No newline at end of file diff --git a/czsc/signals/cxt.py b/czsc/signals/cxt.py index 512cfb6da..3d72f8a74 100644 --- a/czsc/signals/cxt.py +++ b/czsc/signals/cxt.py @@ -8,9 +8,9 @@ import numpy as np from loguru import logger from typing import List -from czsc import CZSC, Signal +from czsc import CZSC from czsc.traders.base import CzscSignals -from czsc.objects import FX, BI, Direction, ZS +from czsc.objects import FX, BI, Direction, ZS, Mark from czsc.utils import get_sub_elements, create_single_signal from collections import OrderedDict @@ -331,3 +331,121 @@ def __is_zs(_bis): return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) +def cxt_bi_end_V230222(c: CZSC, **kwargs) -> OrderedDict: + """当前是最后笔的第几次新低底分型或新高顶分型,用于笔结束辅助 + + **信号逻辑:** + + 1. 取最后笔及未成笔的分型, + 2. 当前如果是顶分型,则看当前顶分型是否新高,是第几个新高 + 2. 当前如果是底分型,则看当前底分型是否新低,是第几个新低 + + **信号列表:** + + - Signal('15分钟_D1MO3_结束辅助_新低_第1次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第2次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第1次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第2次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第3次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第4次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第3次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第4次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第5次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第5次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第6次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第6次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第7次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第7次_任意_0') + + :param c: CZSC对象 + :param kwargs: + :return: 信号识别结果 + """ + max_overlap = int(kwargs.get('max_overlap', 3)) + k1, k2, k3 = f"{c.freq.value}_D1MO{max_overlap}_结束辅助".split('_') + v1 = '其他' + v2 = '其他' + + if not c.ubi_fxs: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + # 为了只取最后一笔以来的分型,没有用底层fx_list + fxs = [] + if c.bi_list: + fxs.extend(c.bi_list[-1].fxs[1:]) + ubi_fxs = c.ubi_fxs + for x in ubi_fxs: + if not fxs or x.dt > fxs[-1].dt: + fxs.append(x) + + # 出分型那刻出信号,或者分型和最后一根bar相差 max_overlap 根K线时间内 + if (fxs[-1].elements[-1].dt == c.bars_ubi[-1].dt) or (c.bars_raw[-1].id - fxs[-1].raw_bars[-1].id <= max_overlap): + if fxs[-1].mark == Mark.G: + up = [x for x in fxs if x.mark == Mark.G] + high_max = float('-inf') + cnt = 0 + for fx in up: + if fx.high > high_max: + cnt += 1 + high_max = fx.high + if fxs[-1].high == high_max: + v1 = '新高' + v2 = cnt + + else: + down = [x for x in fxs if x.mark == Mark.D] + low_min = float('inf') + cnt = 0 + for fx in down: + if fx.low < low_min: + cnt += 1 + low_min = fx.low + if fxs[-1].low == low_min: + v1 = '新低' + v2 = cnt + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=f"第{v2}次") + + +def cxt_bi_end_V230224(c: CZSC, **kwargs): + """量价配合的笔结束辅助 + + **信号逻辑:** + + 1. 向下笔结束:fx_b 内最低的那根K线下影大于上影的两倍,同时fx_b内的平均成交量小于当前笔的平均成交量的0.618 + 2. 向上笔结束:fx_b 内最高的那根K线上影大于下影的两倍,同时fx_b内的平均成交量大于当前笔的平均成交量的2倍 + + **信号列表:** + + - Signal('15分钟_D1MO3_笔结束V230224_看多_任意_任意_0') + - Signal('15分钟_D1MO3_笔结束V230224_看空_任意_任意_0') + + :param c: CZSC 对象 + :return: 信号字典 + """ + max_overlap = int(kwargs.get('max_overlap', 3)) + k1, k2, k3 = f"{c.freq.value}_D1MO{max_overlap}_笔结束V230224".split('_') + v1 = '其他' + if len(c.bi_list) <= 3: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + last_bi = c.bi_list[-1] + bi_bars = last_bi.raw_bars + bi_vol_mean = np.mean([x.vol for x in bi_bars]) + fx_bars = last_bi.fx_b.raw_bars + fx_vol_mean = np.mean([x.vol for x in fx_bars]) + + bar1 = fx_bars[np.argmin([x.low for x in fx_bars])] + bar2 = fx_bars[np.argmax([x.high for x in fx_bars])] + + if bar1.upper > bar1.lower * 2 and fx_vol_mean > bi_vol_mean * 2: + v1 = '看空' + elif 2 * bar2.upper < bar2.lower and fx_vol_mean < bi_vol_mean * 0.618: + v1 = '看多' + else: + v1 = '其他' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + + diff --git a/czsc/signals/tas.py b/czsc/signals/tas.py index d0436be04..e3cce6149 100644 --- a/czsc/signals/tas.py +++ b/czsc/signals/tas.py @@ -364,7 +364,7 @@ def tas_macd_first_bs_V221216(c: CZSC, di: int = 1, **kwargs): s1_con1a = len(cross) > 3 and cross[-1]['类型'] == '金叉' and cross[-1]['慢线'] > 0 s1_con1b = len(cross) > 3 and cross[-1]['类型'] == '死叉' and up[-1]['慢线'] > 0 - s1_con2 = len(dn) > 3 and up[-2]['慢线'] > 0 and up[-3]['慢线'] > 0 + s1_con2 = len(up) > 3 and up[-2]['慢线'] > 0 and up[-3]['慢线'] > 0 s1_con3 = len(macd) > 10 and macd[-1] < macd[-2] if high_n > high_m and (s1_con1a or s1_con1b) and s1_con2 and s1_con3: v1 = "一卖" diff --git a/czsc/strategies.py b/czsc/strategies.py index 9876f5885..71f6a5e76 100644 --- a/czsc/strategies.py +++ b/czsc/strategies.py @@ -96,22 +96,39 @@ def init_bar_generator(self, bars: List[RawBar], **kwargs): bars2 = [x for x in bars if x.dt > bg.end_dt] return bg, bars2 - def init_trader(self, bars: List[RawBar], **kwargs): + def init_trader(self, bars: List[RawBar], **kwargs) -> CzscTrader: """使用策略定义初始化一个 CzscTrader 对象 + **注意:** 这里会将所有持仓策略在 sdt 之后的交易信号计算出来并缓存在持仓策略实例内部,所以初始化的过程本身也是回测的过程。 + :param bars: 基础周期K线 :param kwargs: bg 已经初始化好的BarGenerator对象,如果传入了bg,则忽略sdt和n参数 sdt 初始化开始日期 n 初始化最小K线数量 - :return: + :return: 完成策略初始化后的 CzscTrader 对象 """ bg, bars2 = self.init_bar_generator(bars, **kwargs) - trader = CzscTrader(bg, get_signals=deepcopy(self.get_signals), positions=deepcopy(self.positions)) + trader = CzscTrader(bg=bg, get_signals=deepcopy(self.get_signals), positions=deepcopy(self.positions)) for bar in bars2: trader.on_bar(bar) return trader + def backtest(self, bars: List[RawBar], **kwargs) -> CzscTrader: + trader = self.init_trader(bars, **kwargs) + return trader + + def dummy(self, sigs: List[dict], **kwargs) -> CzscTrader: + """使用信号缓存进行策略回测 + + :param sigs: 信号缓存,一般指 generate_czsc_signals 函数计算的结果缓存 + :return: 完成策略回测后的 CzscTrader 对象 + """ + trader = CzscTrader(positions=deepcopy(self.positions)) + for sig in sigs: + trader.on_sig(sig) + return trader + def replay(self, bars: List[RawBar], res_path, **kwargs): """交易策略交易过程回放 @@ -133,7 +150,7 @@ def replay(self, bars: List[RawBar], res_path, **kwargs): os.makedirs(res_path, exist_ok=exist_ok) bg, bars2 = self.init_bar_generator(bars, **kwargs) - trader = CzscTrader(bg, get_signals=deepcopy(self.get_signals), positions=deepcopy(self.positions)) + trader = CzscTrader(bg=bg, get_signals=deepcopy(self.get_signals), positions=deepcopy(self.positions)) for position in trader.positions: pos_path = os.path.join(res_path, position.name) os.makedirs(pos_path, exist_ok=exist_ok) @@ -252,3 +269,117 @@ def create_pos_c(self): interval=0, timeout=20, stop_loss=50) return pos + +class CzscStocksBeta(CzscStrategyBase): + """CZSC 股票 Beta 策略""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @classmethod + def get_signals(cls, cat) -> OrderedDict: + s = OrderedDict({"symbol": cat.symbol, "dt": cat.end_dt, "close": cat.latest_price}) + s.update(signals.bar_operate_span_V221111(cat.kas['15分钟'], k1='全天', span=('0935', '1450'))) + s.update(signals.bar_operate_span_V221111(cat.kas['15分钟'], k1='上午', span=('0935', '1130'))) + s.update(signals.bar_operate_span_V221111(cat.kas['15分钟'], k1='下午', span=('1300', '1450'))) + s.update(signals.bar_zdt_V221110(cat.kas['15分钟'], di=1)) + + s.update(signals.tas_macd_base_V221028(cat.kas['60分钟'], di=1, key='macd')) + s.update(signals.tas_macd_base_V221028(cat.kas['60分钟'], di=5, key='macd')) + + s.update(signals.tas_ma_base_V221101(cat.kas["日线"], di=1, ma_type='SMA', timeperiod=5)) + s.update(signals.tas_ma_base_V221101(cat.kas["日线"], di=2, ma_type='SMA', timeperiod=5)) + s.update(signals.tas_ma_base_V221101(cat.kas["日线"], di=5, ma_type='SMA', timeperiod=5)) + return s + + @property + def positions(self): + beta1 = self.create_beta1() + beta2 = self.create_beta2() + pos_list = [deepcopy(beta1), deepcopy(beta2)] + return pos_list + + @property + def freqs(self): + return ['日线', '60分钟', '30分钟', '15分钟'] + + def create_beta1(self): + """60分钟MACD金叉 + + **策略描述:** + + 1. 60分钟MACD金叉开多 + 2. 60分钟MACD死叉平多 + """ + opens = [ + {'name': '开多', + 'operate': '开多', + 'signals_all': [], + 'signals_any': [], + 'signals_not': ['15分钟_D1K_ZDT_涨停_任意_任意_0'], + 'factors': [{'name': '60分钟MACD金叉', + 'signals_all': ['全天_0935_1450_是_任意_任意_0', + '60分钟_D1K_MACD_多头_任意_任意_0', + '60分钟_D5K_MACD_空头_任意_任意_0'], + 'signals_any': [], + 'signals_not': []} + ]}, + ] + + exits = [ + {'name': '平多', + 'operate': '平多', + 'signals_all': ['全天_0935_1450_是_任意_任意_0'], + 'signals_any': [], + 'signals_not': ['15分钟_D1K_ZDT_跌停_任意_任意_0'], + 'factors': [{'name': '60分钟MACD死叉', + 'signals_all': ['60分钟_D1K_MACD_空头_任意_任意_0'], + 'signals_any': [], + 'signals_not': []}]}, + + ] + pos = Position(name="60分钟MACD金叉", symbol=self.symbol, + opens=[Event.load(x) for x in opens], + exits=[Event.load(x) for x in exits], + interval=3600 * 4, timeout=16 * 30, stop_loss=500) + return pos + + def create_beta2(self): + """5日线多头 + + **策略特征:** + + + """ + opens = [ + {'name': '开多', + 'operate': '开多', + 'signals_all': [], + 'signals_any': [], + 'signals_not': ['15分钟_D1K_ZDT_涨停_任意_任意_0'], + 'factors': [{'name': '站上SMA5', + 'signals_all': ['上午_0935_1130_是_任意_任意_0', + '日线_D1K_SMA5_多头_任意_任意_0', + '日线_D5K_SMA5_空头_任意_任意_0'], + 'signals_any': [], + 'signals_not': []}]} + ] + + exits = [ + {'name': '平多', + 'operate': '平多', + 'signals_all': [], + 'signals_any': [], + 'signals_not': ['15分钟_D1K_ZDT_跌停_任意_任意_0'], + 'factors': [{'name': '跌破SMA5', + 'signals_all': ['下午_1300_1450_是_任意_任意_0', + '日线_D1K_SMA5_空头_任意_任意_0', + '日线_D2K_SMA5_多头_任意_任意_0'], + 'signals_any': [], + 'signals_not': []}]} + ] + pos = Position(name="5日线多头", symbol=self.symbol, + opens=[Event.load(x) for x in opens], + exits=[Event.load(x) for x in exits], + interval=3600 * 4, timeout=16 * 40, stop_loss=500) + return pos diff --git a/czsc/traders/base.py b/czsc/traders/base.py index 0d694998a..3f4c30425 100644 --- a/czsc/traders/base.py +++ b/czsc/traders/base.py @@ -137,6 +137,7 @@ def generate_czsc_signals(bars: List[RawBar], get_signals: Callable, freqs: List :param df: 是否返回 df 格式的信号计算结果,默认 False :return: 信号计算结果 """ + freqs = [freq for freq in freqs if freq != bars[0].freq.value] sdt = pd.to_datetime(sdt) bars_left = [x for x in bars if x.dt < sdt] if len(bars_left) <= init_n: @@ -226,9 +227,24 @@ def check_signals_acc(bars: List[RawBar], get_signals: Callable, delta_days: int class CzscTrader(CzscSignals): """缠中说禅技术分析理论之多级别联立交易决策类(支持多策略独立执行)""" - def __init__(self, bg: BarGenerator = None, get_signals: Callable = None, positions: List[Position] = None): + def __init__(self, bg: BarGenerator = None, get_signals: Callable = None, + positions: List[Position] = None, ensemble_method: Union[AnyStr, Callable] = "mean"): + """ + + :param bg: bar generator 对象 + :param get_signals: 信号计算函数,输入是 CzscSignals 对象,输出是信号字典 + :param ensemble_method: 多个仓位集成一个仓位的方法,可选值 mean, vote, max;也可以传入一个回调函数 + + 假设有三个仓位对象,当前仓位分别是 1, 1, -1 + mean - 平均仓位,pos = np.mean([1, 1, -1]) = 0.33 + vote - 投票表决,pos = 1 + max - 取最大,pos = 1 + + 对于传入回调函数的情况,输入是 self.positions + """ super().__init__(bg, get_signals=get_signals) self.positions = positions + self.__ensemble_method = ensemble_method def update(self, bar: RawBar) -> None: """输入基础周期已完成K线,更新信号,更新仓位 @@ -264,7 +280,17 @@ def on_bar(self, bar: RawBar) -> None: """ self.update(bar) - def get_ensemble_pos(self, method: Union[AnyStr, Callable] = "mean"): + @property + def pos_changed(self) -> bool: + """判断仓位是否发生变化 + + :return: True/False + """ + if not self.positions: + return False + return any([position.pos_changed for position in self.positions]) + + def get_ensemble_pos(self, method: Union[AnyStr, Callable] = None) -> float: """获取多个仓位的集成仓位 :param method: 多个仓位集成一个仓位的方法,可选值 mean, vote, max;也可以传入一个回调函数 @@ -281,6 +307,7 @@ def get_ensemble_pos(self, method: Union[AnyStr, Callable] = "mean"): if not self.positions: return 0 + method = self.__ensemble_method if not method else method if isinstance(method, str): method = method.lower() pos_seq = [x.pos for x in self.positions] diff --git a/czsc/traders/performance.py b/czsc/traders/performance.py index 57bcc8e35..8fc6e2355 100644 --- a/czsc/traders/performance.py +++ b/czsc/traders/performance.py @@ -178,10 +178,10 @@ def get_pairs_statistics(df_pairs: pd.DataFrame): "平均持仓天数": round(df_pairs['持仓天数'].mean(), 2), "平均持仓K线数": round(df_pairs['持仓K线数'].mean(), 2), - "平均单笔收益": round(df_pairs['盈亏比例'].mean() * 10000, 2), - "单笔收益标准差": round(df_pairs['盈亏比例'].std() * 10000, 2), - "最大单笔收益": round(df_pairs['盈亏比例'].max() * 10000, 2), - "最小单笔收益": round(df_pairs['盈亏比例'].min() * 10000, 2), + "平均单笔收益": round(df_pairs['盈亏比例'].mean(), 4), + "单笔收益标准差": round(df_pairs['盈亏比例'].std(), 4), + "最大单笔收益": round(df_pairs['盈亏比例'].max(), 4), + "最小单笔收益": round(df_pairs['盈亏比例'].min(), 4), "交易胜率": win_pct, "单笔盈亏比": single_gain_loss_rate, diff --git a/czsc/utils/__init__.py b/czsc/utils/__init__.py index 61478e189..2c9c03f0d 100644 --- a/czsc/utils/__init__.py +++ b/czsc/utils/__init__.py @@ -13,6 +13,7 @@ from .io import dill_dump, dill_load, read_json, save_json from .sig import check_pressure_support, check_gap_info, is_bis_down, is_bis_up, get_sub_elements from .sig import same_dir_counts, fast_slow_cross, count_last_same, create_single_signal +from .plotly_plot import KlineChart def x_round(x: [float, int], digit=4): diff --git a/czsc/utils/bar_generator.py b/czsc/utils/bar_generator.py index 55e781c74..9d52a0457 100644 --- a/czsc/utils/bar_generator.py +++ b/czsc/utils/bar_generator.py @@ -102,15 +102,13 @@ def resample_bars(df: pd.DataFrame, target_freq: Union[Freq, AnyStr], raw_bars=T if not isinstance(target_freq, Freq): target_freq = Freq(target_freq) - k_cols = ['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol', 'amount'] - df = df[k_cols] df['freq_edt'] = df['dt'].apply(lambda x: freq_end_time(x, target_freq)) dfk1 = df.groupby('freq_edt').agg( {'symbol': 'first', 'dt': 'last', 'open': 'first', 'close': 'last', 'high': 'max', 'low': 'min', 'vol': 'sum', 'amount': 'sum', 'freq_edt': 'last'}) dfk1.reset_index(drop=True, inplace=True) dfk1['dt'] = dfk1['freq_edt'] - dfk1 = dfk1[k_cols] + dfk1 = dfk1[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol', 'amount']] if raw_bars: _bars = [] diff --git a/czsc/utils/plotly_plot.py b/czsc/utils/plotly_plot.py new file mode 100644 index 000000000..309a50d54 --- /dev/null +++ b/czsc/utils/plotly_plot.py @@ -0,0 +1,151 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2023/2/26 15:03 +describe: 使用 Plotly 构建绘图模块 +""" +import os +import webbrowser +import numpy as np +import pandas as pd +from plotly import graph_objects as go +from plotly.subplots import make_subplots +from czsc.utils.cache import home_path +from czsc.utils.ta import MACD + + +class KlineChart: + """K线绘图工具类 + + plotly 参数详解: https://www.jianshu.com/p/4f4daf47cc85 + + """ + def __init__(self, **kwargs): + # 子图数量 + self.n_rows = kwargs.get('n_rows', 3) + + if self.n_rows == 3: + row_heights = [0.6, 0.2, 0.2] + elif self.n_rows == 4: + row_heights = [0.55, 0.15, 0.15, 0.15] + elif self.n_rows == 5: + row_heights = [0.4, 0.15, 0.15, 0.15, 0.15] + else: + raise ValueError("n_rows 只能是 3, 4, 5") + + self.color_red = 'rgba(249,41,62,0.7)' + self.color_green = 'rgba(0,170,59,0.7)' + fig = make_subplots(rows=self.n_rows, cols=1, shared_xaxes=True, row_heights=row_heights, + horizontal_spacing=0, vertical_spacing=0) + + fig = fig.update_yaxes(showgrid=True, zeroline=False, automargin=True, fixedrange=True) + fig = fig.update_xaxes(type='category', rangeslider_visible=False, showgrid=False, automargin=True, + showticklabels=False) + + # https://plotly.com/python/reference/layout/ + fig.update_layout( + title=dict(text=kwargs.get('title', 'K线图'), yanchor='top'), + margin=dict(t=10, b=10), + legend=dict(orientation='h', yanchor="top", y=1.01, xanchor="center", x=0.5), + template="plotly_dark", + hovermode="x unified", + hoverlabel=dict(bgcolor='rgba(255,255,255,0.1)'), # 透明,更容易看清后面k线 + dragmode='pan', + legend_title_font_color="red", + ) + self.fig = fig + + def add_kline(self, kline: pd.DataFrame, name: str = "K线", **kwargs): + """绘制K线""" + if 'text' not in kline.columns: + kline['text'] = "" + + candle = go.Candlestick(x=kline['dt'], open=kline["open"], high=kline["high"], low=kline["low"], + close=kline["close"], text=kline["text"], name=name, showlegend=True, + increasing_line_color=self.color_red, decreasing_line_color=self.color_green, + increasing_fillcolor=self.color_red, decreasing_fillcolor=self.color_green) + self.fig.add_trace(candle, row=1, col=1) + + def add_vol(self, kline: pd.DataFrame, row=2, **kwargs): + """绘制成交量图""" + df = kline.copy() + df['vol_color'] = np.where(df['close'] > df['open'], self.color_red, self.color_green) + self.add_bar_indicator(df['dt'], df['vol'], color=df['vol_color'], name="成交量", row=row, show_legend=False) + + def add_sma(self, kline: pd.DataFrame, row=1, ma_seq=(5, 10, 20), visible=False, **kwargs): + """绘制均线图""" + df = kline.copy() + line_width = kwargs.get('line_width', 0.6) + for ma in ma_seq: + self.add_scatter_indicator(df['dt'], df['close'].rolling(ma).mean(), name=f"MA{ma}", + row=row, line_width=line_width, visible=visible, show_legend=True) + + def add_macd(self, kline: pd.DataFrame, row=3, **kwargs): + """绘制MACD图""" + df = kline.copy() + fastperiod = kwargs.get('fastperiod', 12) + slowperiod = kwargs.get('slowperiod', 26) + signalperiod = kwargs.get('signalperiod', 9) + + diff, dea, macd = MACD(df["close"], fastperiod=fastperiod, slowperiod=slowperiod, signalperiod=signalperiod) + macd_colors = np.where(macd > 0, self.color_red, self.color_green) + self.add_scatter_indicator(df['dt'], diff, name="DIFF", row=row, + line_color="rgba(184, 117, 225, 1.0)", show_legend=False, line_width=0.6) + self.add_scatter_indicator(df['dt'], dea, name="DEA", row=row, + line_color="rgba(255, 0, 0, 1.0)", show_legend=False, line_width=0.6) + self.add_bar_indicator(df['dt'], macd, name="MACD", row=row, color=macd_colors, show_legend=False) + + def add_scatter_indicator(self, x, y, name: str, row: int, text=None, **kwargs): + """绘制线性指标 + + :param x: 指标的x轴 + :param y: 指标的y轴 + :param name: 指标名称 + :param row: 放入第几个子图 + :param text: 文本说明 + :param kwargs: + :return: + """ + mode = kwargs.get('mode', 'text+lines') + line_color = kwargs.get('line_color', None) + line_width = kwargs.get('line_width', None) + hover_template = kwargs.get('hover_template', '%{y:.3f}') + show_legend = kwargs.get('show_legend', True) + visible = True if kwargs.get('visible', True) else 'legendonly' + + scatter = go.Scatter(x=x, y=y, name=name, text=text, line_width=line_width, line_color=line_color, mode=mode, + hovertemplate=hover_template, showlegend=show_legend, visible=visible, opacity=0.4) + self.fig.add_trace(scatter, row=row, col=1) + + def add_bar_indicator(self, x, y, name: str, row: int, color=None, **kwargs): + """绘制条形图指标 + + :param x: 指标的x轴 + :param y: 指标的y轴 + :param name: 指标名称 + :param row: 放入第几个子图 + :param color: 指标的颜色,可以是单个颜色,也可以是一个列表,列表长度和y的长度一致,指示每个y的颜色 + 比如:color = 'rgba(249,41,62,0.7)' 或者 color = ['rgba(249,41,62,0.7)', 'rgba(0,170,59,0.7)'] + :param kwargs: + :return: + """ + hover_template = kwargs.get('hover_template', '%{y:.3f}') + show_legend = kwargs.get('show_legend', True) + visible = kwargs.get('visible', True) + if color is None: + color = self.color_red + + bar = go.Bar(x=x, y=y, marker_line_color=color, marker_color=color, name=name, + showlegend=show_legend, hovertemplate=hover_template, visible=visible, base=True) + self.fig.add_trace(bar, row=row, col=1) + + def open_in_browser(self, file_name: str = None, **kwargs): + """在浏览器中打开""" + if not file_name: + file_name = os.path.join(home_path, "kline_chart.html") + self.fig.update_layout(**kwargs) + self.fig.write_html(file_name) + webbrowser.open(file_name) + + diff --git a/examples/czsc_tushare_stream.py b/examples/czsc_tushare_stream.py new file mode 100644 index 000000000..9a540c351 --- /dev/null +++ b/examples/czsc_tushare_stream.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2022/7/12 14:22 +describe: 用 tushare 数据复盘K线行情 +环境: +streamlit-echarts +""" +import sys +sys.path.insert(0, '.') +sys.path.insert(0, '..') +import streamlit as st +from datetime import datetime +import streamlit_echarts as st_echarts +from czsc.traders.base import CzscSignals, BarGenerator +from czsc.utils import freqs_sorted +from czsc.data import TsDataCache, get_symbols + + +st.set_page_config(layout="wide") + +dc = TsDataCache(data_path=r"D:\ts_data") + +with st.sidebar: + st.title("使用 tushare 数据复盘K线行情") + symbol = st.selectbox("选择合约", options=get_symbols(dc, 'index'), index=0) + edt = st.date_input("结束时间", value=datetime.now()) + + +ts_code, asset = symbol.split('#') +bars = dc.pro_bar_minutes(ts_code=ts_code, asset=asset, freq='5min', sdt="20150101", edt=edt) +st.success(f'{symbol} K线加载完成!', icon="✅") +freqs = ['5分钟', '15分钟', '30分钟', '60分钟', '日线', '周线', '月线'] +counts = 100 +bg = BarGenerator(base_freq=freqs[0], freqs=freqs[1:], max_count=1000) +for bar in bars[:-counts]: + bg.update(bar) +cs, remain_bars = CzscSignals(bg), bars[-counts:] +for bar in remain_bars: + cs.update_signals(bar) + +for freq in freqs: + st.subheader(f"{freq} K线") + st_echarts.st_pyecharts(cs.kas[freq].to_echarts(), width='100%', height='600px') + + + diff --git a/examples/signals_dev/bar_vol_bs1_V230224.py b/examples/signals_dev/bar_vol_bs1_V230224.py new file mode 100644 index 000000000..300fedfee --- /dev/null +++ b/examples/signals_dev/bar_vol_bs1_V230224.py @@ -0,0 +1,79 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2021/12/13 17:48 +describe: 验证信号计算的准确性,仅适用于缠论笔相关的信号, + 技术指标构建的信号,用这个工具检查不是那么方便 +""" +import sys + +sys.path.insert(0, '..') +import os +import numpy as np +from loguru import logger +from collections import OrderedDict +from czsc.data.ts_cache import TsDataCache +from czsc import CZSC, Signal +from czsc.traders.base import CzscTrader, check_signals_acc +from czsc.signals.tas import update_ma_cache +from czsc.utils import get_sub_elements, create_single_signal +from czsc import signals + +os.environ['czsc_verbose'] = '1' + +data_path = r'C:\ts_data' +dc = TsDataCache(data_path, sdt='2010-01-01', edt='20211209') + +symbol = '000001.SZ' +bars = dc.pro_bar_minutes(ts_code=symbol, asset='E', freq='15min', + sdt='20181101', edt='20210101', adj='qfq', raw_bar=True) + + +def bar_vol_bs1_V230224(c: CZSC, di: int = 1, n: int = 20, **kwargs): + """量价配合的高低点判断 + + **信号逻辑:** + + 1. 高点看空:窗口内最近一根K线上影大于下影的两倍,同时最高价和成交量同时创新高 + 2. 反之,低点看多 + + **信号列表:** + + - Signal('15分钟_D2N34量价_BS1辅助_看多_任意_任意_0') + - Signal('15分钟_D2N34量价_BS1辅助_看空_任意_任意_0') + + :param c: CZSC 对象 + :param di: 倒数第i根K线 + :param n: 窗口大小 + :return: 信号字典 + """ + k1, k2, k3 = f"{c.freq.value}_D{di}N{n}量价_BS1辅助".split('_') + _bars = get_sub_elements(c.bars_raw, di=di, n=n) + mean_vol = np.mean([x.amount for x in _bars]) + + short_c1 = _bars[-1].high == max([x.high for x in _bars]) and _bars[-1].upper > 2 * _bars[-1].lower > 0 + short_c2 = _bars[-1].amount > mean_vol * 3 + + long_c1 = _bars[-1].low == min([x.low for x in _bars]) and _bars[-1].lower > 2 * _bars[-1].upper > 0 + long_c2 = _bars[-1].amount < mean_vol * 0.7 + + if short_c1 and short_c2: + v1 = '看空' + elif long_c1 and long_c2: + v1 = '看多' + else: + v1 = '其他' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def get_signals(cat: CzscTrader) -> OrderedDict: + s = OrderedDict({"symbol": cat.symbol, "dt": cat.end_dt, "close": cat.latest_price}) + # 使用缓存来更新信号的方法 + s.update(bar_vol_bs1_V230224(cat.kas['15分钟'], di=2, n=34)) + return s + + +if __name__ == '__main__': + check_signals_acc(bars, get_signals) diff --git a/examples/signals_dev/cxt_bi_end_V230222.py b/examples/signals_dev/cxt_bi_end_V230222.py new file mode 100644 index 000000000..b99dbb84b --- /dev/null +++ b/examples/signals_dev/cxt_bi_end_V230222.py @@ -0,0 +1,125 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2021/12/13 17:48 +describe: 验证信号计算的准确性,仅适用于缠论笔相关的信号, + 技术指标构建的信号,用这个工具检查不是那么方便 +""" +import sys +sys.path.insert(0, '..') +import os +import numpy as np +from loguru import logger +from collections import OrderedDict +from czsc.data.ts_cache import TsDataCache +from czsc import CZSC, Signal +from czsc.objects import Mark +from czsc.traders.base import CzscTrader, check_signals_acc +from czsc.signals.tas import update_ma_cache +from czsc.utils import get_sub_elements, create_single_signal +from czsc import signals + + +os.environ['czsc_verbose'] = '1' + +data_path = r'C:\ts_data' +dc = TsDataCache(data_path, sdt='2010-01-01', edt='20211209') + +symbol = '000001.SZ' +bars = dc.pro_bar_minutes(ts_code=symbol, asset='E', freq='15min', + sdt='20181101', edt='20210101', adj='qfq', raw_bar=True) + + +def cxt_bi_end_V230222(c: CZSC, **kwargs) -> OrderedDict: + """当前是最后笔的第几次新低底分型或新高顶分型,用于笔结束辅助 + + **信号逻辑:** + + 1. 取最后笔及未成笔的分型, + 2. 当前如果是顶分型,则看当前顶分型是否新高,是第几个新高 + 2. 当前如果是底分型,则看当前底分型是否新低,是第几个新低 + + **信号列表:** + + - Signal('15分钟_D1MO3_结束辅助_新低_第1次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第2次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第1次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第2次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第3次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第4次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第3次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第4次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第5次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第5次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第6次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第6次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新高_第7次_任意_0') + - Signal('15分钟_D1MO3_结束辅助_新低_第7次_任意_0') + + :param c: CZSC对象 + :param kwargs: + :return: 信号识别结果 + """ + max_overlap = int(kwargs.get('max_overlap', 3)) + k1, k2, k3 = f"{c.freq.value}_D1MO{max_overlap}_结束辅助".split('_') + v1 = '其他' + v2 = '其他' + + if not c.ubi_fxs: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + # 为了只取最后一笔以来的分型,没有用底层fx_list + fxs = [] + if c.bi_list: + fxs.extend(c.bi_list[-1].fxs[1:]) + ubi_fxs = c.ubi_fxs + for x in ubi_fxs: + if not fxs or x.dt > fxs[-1].dt: + fxs.append(x) + + # 出分型那刻出信号,或者分型和最后一根bar相差 max_overlap 根K线时间内 + if (fxs[-1].elements[-1].dt == c.bars_ubi[-1].dt) or (c.bars_raw[-1].id - fxs[-1].raw_bars[-1].id <= max_overlap): + if fxs[-1].mark == Mark.G: + up = [x for x in fxs if x.mark == Mark.G] + high_max = float('-inf') + cnt = 0 + for fx in up: + if fx.high > high_max: + cnt += 1 + high_max = fx.high + if fxs[-1].high == high_max: + v1 = '新高' + v2 = cnt + + else: + down = [x for x in fxs if x.mark == Mark.D] + low_min = float('inf') + cnt = 0 + for fx in down: + if fx.low < low_min: + cnt += 1 + low_min = fx.low + if fxs[-1].low == low_min: + v1 = '新低' + v2 = cnt + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=f"第{v2}次") + + +def get_signals(cat: CzscTrader) -> OrderedDict: + s = OrderedDict({"symbol": cat.symbol, "dt": cat.end_dt, "close": cat.latest_price}) + # 使用缓存来更新信号的方法 + s.update(cxt_bi_end_V230222(cat.kas['15分钟'])) + return s + + +if __name__ == '__main__': + check_signals_acc(bars, get_signals) + + + + + + + diff --git a/examples/signals_dev/cxt_bi_end_V230224.py b/examples/signals_dev/cxt_bi_end_V230224.py new file mode 100644 index 000000000..d40a52f10 --- /dev/null +++ b/examples/signals_dev/cxt_bi_end_V230224.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2021/12/13 17:48 +describe: 验证信号计算的准确性,仅适用于缠论笔相关的信号, + 技术指标构建的信号,用这个工具检查不是那么方便 +""" +import sys + +sys.path.insert(0, '..') +import os +import numpy as np +from loguru import logger +from collections import OrderedDict +from czsc.data.ts_cache import TsDataCache +from czsc import CZSC, Signal +from czsc.traders.base import CzscTrader, check_signals_acc +from czsc.signals.tas import update_ma_cache +from czsc.utils import get_sub_elements, create_single_signal +from czsc import signals + +os.environ['czsc_verbose'] = '1' + +data_path = r'C:\ts_data' +dc = TsDataCache(data_path, sdt='2010-01-01', edt='20211209') + +symbol = '000001.SZ' +bars = dc.pro_bar_minutes(ts_code=symbol, asset='E', freq='15min', + sdt='20181101', edt='20210101', adj='qfq', raw_bar=True) + + +def cxt_bi_end_V230224(c: CZSC, **kwargs): + """量价配合的笔结束辅助 + + **信号逻辑:** + + 1. 向下笔结束:fx_b 内最低的那根K线下影大于上影的两倍,同时fx_b内的平均成交量小于当前笔的平均成交量的0.618 + 2. 向上笔结束:fx_b 内最高的那根K线上影大于下影的两倍,同时fx_b内的平均成交量大于当前笔的平均成交量的2倍 + + **信号列表:** + + - Signal('15分钟_D1MO3_笔结束V230224_看多_任意_任意_0') + - Signal('15分钟_D1MO3_笔结束V230224_看空_任意_任意_0') + + :param c: CZSC 对象 + :return: 信号字典 + """ + max_overlap = int(kwargs.get('max_overlap', 3)) + k1, k2, k3 = f"{c.freq.value}_D1MO{max_overlap}_笔结束V230224".split('_') + v1 = '其他' + if len(c.bi_list) <= 3: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + last_bi = c.bi_list[-1] + bi_bars = last_bi.raw_bars + bi_vol_mean = np.mean([x.vol for x in bi_bars]) + fx_bars = last_bi.fx_b.raw_bars + fx_vol_mean = np.mean([x.vol for x in fx_bars]) + + bar1 = fx_bars[np.argmin([x.low for x in fx_bars])] + bar2 = fx_bars[np.argmax([x.high for x in fx_bars])] + + if bar1.upper > bar1.lower * 2 and fx_vol_mean > bi_vol_mean * 2: + v1 = '看空' + elif 2 * bar2.upper < bar2.lower and fx_vol_mean < bi_vol_mean * 0.618: + v1 = '看多' + else: + v1 = '其他' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def get_signals(cat: CzscTrader) -> OrderedDict: + s = OrderedDict({"symbol": cat.symbol, "dt": cat.end_dt, "close": cat.latest_price}) + # 使用缓存来更新信号的方法 + s.update(cxt_bi_end_V230224(cat.kas['15分钟'], max_overlap=3)) + return s + + +if __name__ == '__main__': + check_signals_acc(bars, get_signals) diff --git a/examples/ts_check_signal_acc.py b/examples/ts_check_signal_acc.py index cb43d80f9..7ee68c316 100644 --- a/examples/ts_check_signal_acc.py +++ b/examples/ts_check_signal_acc.py @@ -30,94 +30,10 @@ sdt='20181101', edt='20210101', adj='qfq', raw_bar=True) -def bar_big_solid_V230215(c: CZSC, di: int = 1, n: int = 20, **kwargs): - """窗口内最大实体K线的中间价区分多空 - - **信号逻辑:** - - 1. 找到窗口内最大实体K线, 据其中间位置区分多空 - - **信号列表:** - - - Signal('日线_D1N10_MID_看空_大阳_任意_0') - - Signal('日线_D1N10_MID_看空_大阴_任意_0') - - Signal('日线_D1N10_MID_看多_大阴_任意_0') - - Signal('日线_D1N10_MID_看多_大阳_任意_0') - - :param c: CZSC 对象 - :param di: 倒数第i根K线 - :param n: 窗口大小 - :return: 信号字典 - """ - k1, k2, k3 = f"{c.freq.value}_D{di}N{n}_MID".split('_') - _bars = get_sub_elements(c.bars_raw, di=di, n=n) - - # 找到窗口内最大实体K线 - max_i = np.argmax([x.solid for x in _bars]) - max_solid_bar = _bars[max_i] - max_solid_mid = min(max_solid_bar.open, max_solid_bar.close) + 0.5 * max_solid_bar.solid - - v1 = '看多' if c.bars_raw[-1].close > max_solid_mid else '看空' - v2 = '大阳' if max_solid_bar.close > max_solid_bar.open else '大阴' - return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) - - -def bar_first_bs_V230217(c: CZSC, di: int = 1, n: int = 10, ma_type='SMA', ma_seq: int = 5, **kwargs) -> OrderedDict: - """ - - - - Signal('日线_D1N10SMA5_BS1辅助_一买_任意_任意_0') - - Signal('日线_D1N10SMA5_BS1辅助_一卖_任意_任意_0') - """ - assert 5 <= n <= 50 - key = update_ma_cache(c, ma_type, ma_seq) - k1, k2, k3 = f"{c.freq.value}_D{di}N{n}{ma_type}{ma_seq}_BS1辅助".split('_') - v1 = '其他' - if len(c.bars_raw) < n + 5: - return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - - _bars = get_sub_elements(c.bars_raw, di=di, n=n) - sma = [x.cache[key] for x in _bars] - low = [x.low for x in _bars] - _open = [x.open for x in _bars] - close = [x.close for x in _bars] - high = [x.high for x in _bars] - - # 窗口N内的K线的最低点全部小于SMA5 - condition_1_down = np.all(np.array(sma) > np.array(low)) - condition_1_up = np.all(np.array(sma) < np.array(high)) - - n1, m1 = 0, 0 - for i in range(len(low)): - if close[i] < _open[i]: - n1 += 1 - if close[i] > _open[i]: - m1 += 1 - condition_2_down = True if (n1 / len(low)) > 0.6 else False - condition_2_up = True if (m1 / len(low)) > 0.6 else False - - # 最近三根K线创新低 - condition_3_down = True if min(low[-3:]) < min(low[:-3]) else False - condition_3_up = True if max(high[-3:]) > max(high[:-3]) else False - - # 最后一根K线收在MA5之上/下 - condition_4_down = True if close[-1] > sma[-1] else False - condition_4_up = True if close[-1] < sma[-1] else False - - if condition_1_down and condition_2_down and condition_3_down and condition_4_down: - v1 = '一买' - elif condition_1_up and condition_2_up and condition_3_up and condition_4_up: - v1 = '一卖' - else: - v1 = '其他' - - return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - - def get_signals(cat: CzscTrader) -> OrderedDict: s = OrderedDict({"symbol": cat.symbol, "dt": cat.end_dt, "close": cat.latest_price}) # 使用缓存来更新信号的方法 - s.update(bar_big_solid_V230215(cat.kas['日线'], di=1, n=10)) + s.update(signals.tas_macd_first_bs_V221216(cat.kas['日线'], di=1)) return s diff --git a/examples/use_czsc_trader.py b/examples/use_czsc_trader.py index 08d25174a..1f679dd2e 100644 --- a/examples/use_czsc_trader.py +++ b/examples/use_czsc_trader.py @@ -39,11 +39,15 @@ def use_czsc_trader_by_qmt(): print(bars[-1]) # 初始化交易对象 - # trader = tactic.init_trader(bars, sdt='20200801') + trader = tactic.init_trader(bars, sdt='20200801') - # 执行策略回放,生成交易快照文件 - trader = tactic.replay(bars, res_path=r"C:\ts_data_czsc\trade_replay_test", sdt='20170101', refresh=True) - print(trader.positions[0].evaluate_pairs()) + # 执行策略回放,查看交易结果 + for i in range(len(trader.positions)): + print(trader.positions[i].evaluate_pairs()) + + # # 执行策略回放,生成交易快照文件 + # trader = tactic.replay(bars, res_path=r"C:\ts_data_czsc\trade_replay_test", sdt='20170101', refresh=True) + # print(trader.positions[0].evaluate_pairs()) def example_qmt_manager(): diff --git a/requirements.txt b/requirements.txt index 6ac215271..c8b276691 100644 --- a/requirements.txt +++ b/requirements.txt @@ -17,4 +17,5 @@ loguru>=0.6.0 click pytest tenacity>=8.1.0 -requests-toolbelt>=0.10.1 \ No newline at end of file +requests-toolbelt>=0.10.1 +plotly>=5.11.0 \ No newline at end of file diff --git a/test/test_plotly_plot.py b/test/test_plotly_plot.py new file mode 100644 index 000000000..0898eb345 --- /dev/null +++ b/test/test_plotly_plot.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2023/2/26 15:06 +describe: +""" +import os +import pandas as pd +from czsc import CZSC, KlineChart +from test.test_analyze import read_daily + + +def test_kline_chart(): + """测试K线图""" + bars = read_daily() + c = CZSC(bars, max_bi_num=50) + + df = pd.DataFrame(c.bars_raw) + df['text'] = "测试" + kline = KlineChart(n_rows=3) + kline.add_kline(df, name="K线") + kline.add_sma(df, ma_seq=(5, 10, 21), row=1, visible=True) + kline.add_sma(df, ma_seq=(34, 55, 89, 144), row=1, visible=False) + kline.add_vol(df, row=2) + kline.add_macd(df, row=3) + if len(c.bi_list) > 0: + bi = pd.DataFrame([{'dt': x.fx_a.dt, "bi": x.fx_a.fx, "text": x.fx_a.mark.value} for x in c.bi_list] + + [{'dt': c.bi_list[-1].fx_b.dt, "bi": c.bi_list[-1].fx_b.fx, + "text": c.bi_list[-1].fx_b.mark.value}]) + fx = pd.DataFrame([{'dt': x.dt, "fx": x.fx} for x in c.fx_list]) + kline.add_scatter_indicator(fx['dt'], fx['fx'], name="分型", row=1, line_width=1.2) + kline.add_scatter_indicator(bi['dt'], bi['bi'], name="笔", text=bi['text'], row=1, line_width=1.2) + # kline.open_in_browser() + file_html = "kline_chart_test.html" + kline.fig.write_html(file_html) + assert os.path.exists(file_html) + os.remove(file_html) + assert not os.path.exists(file_html) + diff --git a/test/test_strategy.py b/test/test_strategy.py new file mode 100644 index 000000000..9972379b2 --- /dev/null +++ b/test/test_strategy.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2023/2/23 19:13 +describe: +""" +import pandas as pd +from test.test_analyze import read_1min +from czsc.utils.bar_generator import resample_bars, Freq +from czsc.traders import generate_czsc_signals +from czsc.strategies import CzscStrategyExample1 + + +def test_czsc_strategy(): + bars = read_1min() + df = pd.DataFrame(bars) + bars = resample_bars(df, Freq.F30) + + strategy = CzscStrategyExample1(symbol="000001.SH") + + trader1 = strategy.init_trader(bars=bars, init_n=2000, sdt="20100101") + assert len(trader1.positions) == 3 + sigs = generate_czsc_signals(bars, strategy.get_signals, strategy.sorted_freqs, init_n=2000, sdt="20100101") + trader2 = strategy.dummy(sigs) + assert len(trader2.positions) == 3 + + for i in [0, 1, 2]: + pos1 = trader1.positions[i] + assert len(trader1.positions[i].evaluate()) == len(trader2.positions[i].evaluate()) + assert len(trader1.positions[i].pairs) == len(trader2.positions[i].pairs) + assert pos1.evaluate("多空")['覆盖率'] == pos1.evaluate("多头")['覆盖率'] + pos1.evaluate("空头")['覆盖率'] + diff --git a/test/test_trader_base.py b/test/test_trader_base.py index 195522f86..43c5ae12c 100644 --- a/test/test_trader_base.py +++ b/test/test_trader_base.py @@ -223,7 +223,12 @@ def __create_sma20_pos(): positions=[__create_sma5_pos(), __create_sma10_pos(), __create_sma20_pos()]) for bar in bars_right: ct.update(bar) - print(f"{bar.dt}: pos_seq = {[x.pos for x in ct.positions]}mean_pos = {ct.get_ensemble_pos('mean')}; vote_pos = {ct.get_ensemble_pos('vote')}; max_pos = {ct.get_ensemble_pos('max')}") + for _pos in ct.positions: + if _pos.pos_changed: + assert _pos.operates[-1]['dt'] == _pos.end_dt + print(_pos.name, _pos.operates[-1], _pos.end_dt, _pos.pos) + assert ct.pos_changed + # print(f"{bar.dt}: pos_seq = {[x.pos for x in ct.positions]}mean_pos = {ct.get_ensemble_pos('mean')}; vote_pos = {ct.get_ensemble_pos('vote')}; max_pos = {ct.get_ensemble_pos('max')}") assert list(ct.positions[0].dump(False).keys()) == ['symbol', 'name', 'opens', 'exits', 'interval', 'timeout', 'stop_loss', 'T0'] assert list(ct.positions[0].dump(True).keys()) == ['symbol', 'name', 'opens', 'exits', 'interval', 'timeout', 'stop_loss', 'T0', 'pairs', 'holds']