From 288baff2e4e39588e62d1185e1a3079dbc2c5251 Mon Sep 17 00:00:00 2001 From: zengbin93 Date: Wed, 17 Nov 2021 21:38:55 +0800 Subject: [PATCH] 0.8.5 upload(#65) --- README.md | 4 +- czsc/__init__.py | 4 +- czsc/analyze.py | 6 +- czsc/aphorism.py | 6 + czsc/cobra/backtest.py | 55 +-- czsc/cobra/utils.py | 21 - czsc/sensors/stocks.py | 420 +++++++++++------- czsc/sensors/utils.py | 87 ++++ czsc/traders/advanced.py | 40 +- czsc/traders/daily.py | 13 +- czsc/utils/kline_generator.py | 4 +- docs/README.md | 6 +- ...00\345\217\221\346\226\207\346\241\243.md" | 36 -- examples/run_stocks_sensors.py | 66 ++- requirements.txt | 3 +- test/test_advanced_trader.py | 34 +- test/test_cobra_backtest.py | 8 - test/test_cobra_utils.py | 27 -- test/test_traders_daily.py | 15 + 19 files changed, 483 insertions(+), 372 deletions(-) create mode 100644 czsc/sensors/utils.py delete mode 100644 "docs/\345\215\225\345\233\240\345\255\220\345\210\206\346\236\220\345\267\245\345\205\267\345\274\200\345\217\221\346\226\207\346\241\243.md" diff --git a/README.md b/README.md index 3335a1e2f..273671a47 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,7 @@ ## 使用前必看 * 这是个人开发的项目,虽然我已经尽可能的避坑,但可以很直接的说,这里面一定还有坑,使用前请仔细校验分析结果,发现新坑请告诉我,我来填; -* 目前开发完成度不高,API可能会有比较大的变动,暂时不准备写文档,没有能力看懂源码的,不建议现在使用。 +* 目前开发完成度不高,**API会有比较大的变动,谨慎升级版本**,暂时不准备写文档,没有能力看懂源码的,不建议现在使用。 * 免责声明:项目开源仅用于技术交流! ## 项目贡献 @@ -60,8 +60,6 @@ pip install czsc -U -i https://pypi.python.org/simple 目前已经实现了缠论的 `分型、笔` 的自动识别,核心代码在 `czsc.analyze` 中; -使用聚宽数据的快速入门请查看 `examples\czsc_quick_start.py` - ## 资料分享 * 链接:https://pan.baidu.com/s/1RXkP3188F0qu8Yk6CjbxRQ diff --git a/czsc/__init__.py b/czsc/__init__.py index df4d94d85..7cf95deb5 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -5,10 +5,10 @@ from .data.jq import JqCzscTrader from . import aphorism -__version__ = "0.8.4" +__version__ = "0.8.5" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20211113" +__date__ = "20211116" print(f"欢迎使用CZSC!当前版本标识为 {__version__}@{__date__}\n") diff --git a/czsc/analyze.py b/czsc/analyze.py index 213b8165c..47100779c 100644 --- a/czsc/analyze.py +++ b/czsc/analyze.py @@ -1,10 +1,11 @@ # coding: utf-8 import os import webbrowser -from typing import List, Callable -from datetime import datetime import pandas as pd import traceback +from typing import List, Callable +from datetime import datetime +from deprecated import deprecated from collections import OrderedDict from pyecharts.charts import Tab from pyecharts.components import Table @@ -394,6 +395,7 @@ def finished_bis(self) -> List[BI]: return self.bi_list +@deprecated(reason="这个版本不支持分批买卖,实现逻辑也过于复杂,放弃维护", version='1.0.0') class CzscTrader: """缠中说禅技术分析理论之多级别联立交易决策类""" diff --git a/czsc/aphorism.py b/czsc/aphorism.py index 31f069c2d..83e0dfbf5 100644 --- a/czsc/aphorism.py +++ b/czsc/aphorism.py @@ -529,6 +529,12 @@ --摘自《2006-11-19 12:12 《论语》详解:给所有曲解孔子的人(32)》 """, + "60": """ +不要预测任何消息的影响,而应该仔细观察市场对消息的反应,即市场走势本身。如感冒之于人的体质,消 +息是测试市场体质的,而不是用来预测的。 + +--摘自《即缠非缠》微博 2021-11-16 +""", } def print_one(): diff --git a/czsc/cobra/backtest.py b/czsc/cobra/backtest.py index d2b4d96dd..bc9a0320c 100644 --- a/czsc/cobra/backtest.py +++ b/czsc/cobra/backtest.py @@ -6,6 +6,7 @@ """ import traceback import pandas as pd +from deprecated import deprecated from typing import List, Callable, Tuple from tqdm import tqdm @@ -14,6 +15,7 @@ from ..analyze import CzscTrader, KlineGenerator +@deprecated(reason="generate_signals将被弃用,请使用 czsc.sensors.utils.generate_signals 进行替换", version='1.0.0') def generate_signals(f1_raw_bars: List[RawBar], init_count: int = 50000, get_signals: Callable = get_default_signals) -> List[dict]: @@ -162,56 +164,3 @@ def long_trade_simulator(signals: List[dict], pf['结束时间'] = signals[-1]['dt'].strftime("%Y-%m-%d %H:%M") return pairs, pf - -def one_event_estimator(signals: List[dict], event: Event) -> Tuple[List[dict], dict]: - """评估单个事件的表现 - - :param signals: 信号列表,必须按时间升序 - :param event: 事件 - :return: 交易对,绩效 - """ - assert len(signals) > 1000 and signals[1]['dt'] > signals[0]['dt'] - - trades = [] - cache = {'long_stop_price': -1, 'last_op': None} - - for signal in signals: - m, f = event.is_match(signal) - if cache['last_op'] != Operate.LO and m: - trades.append({ - "标的代码": signal['symbol'], - '开仓时间': signal['dt'].strftime("%Y-%m-%d %H:%M"), - '开仓价格': signal['close'], - '开仓理由': f, - 'oid': signal['id'], - }) - cache['last_op'] = Operate.LO - - if cache['last_op'] == Operate.LO and not m: - trades.append({ - "标的代码": signal['symbol'], - '平仓时间': signal['dt'].strftime("%Y-%m-%d %H:%M"), - '平仓价格': signal['close'], - '平仓理由': "事件空白", - 'eid': signal['id'], - }) - cache['last_op'] = Operate.LE - - if len(trades) % 2 != 0: - trades = trades[:-1] - - pairs = [] - for i in range(0, len(trades), 2): - o, e = dict(trades[i]), dict(trades[i + 1]) - o.update(e) - o['持仓分钟'] = o['eid'] - o['oid'] - o['盈亏(%)'] = int((o['平仓价格'] - o['开仓价格']) / o['开仓价格'] * 10000) / 100 - pairs.append(o) - - pf = long_trade_estimator(pairs) - pf['基准收益(%)'] = int((signals[-1]['close'] - signals[0]['open']) / signals[0]['open'] * 10000) / 100 - pf['基准每分钟BP'] = round((pf['基准收益(%)'] * 100) / (signals[-1]['id'] - signals[0]['id']), 4) - pf['开始时间'] = signals[0]['dt'].strftime("%Y-%m-%d %H:%M") - pf['结束时间'] = signals[-1]['dt'].strftime("%Y-%m-%d %H:%M") - return pairs, pf - diff --git a/czsc/cobra/utils.py b/czsc/cobra/utils.py index 4ea3e7899..ce2a79aa6 100644 --- a/czsc/cobra/utils.py +++ b/czsc/cobra/utils.py @@ -100,24 +100,3 @@ def kdj_dead_cross(kline: Union[List[RawBar], pd.DataFrame], just: bool = True) return False -def drop_duplicates_by_window(seq: List[Union[str, int, float]], - default_value: [str, int, float] = None, - window_size: int = 5) -> List[Union[str, int, float]]: - """从左到右按窗口进行去重,并使用 default_value 进行填充 - - :param seq: 输入序列 - :param default_value: 重复位置的默认填充值 - :param window_size: 窗口大小 - :return: 去重后的序列 - """ - for i in range(len(seq)): - if i < window_size: - left = seq[: i] - else: - left = seq[i-window_size+1: i] - - if seq[i] in left: - seq[i] = default_value - return seq - - diff --git a/czsc/sensors/stocks.py b/czsc/sensors/stocks.py index 70b01c830..28117ad08 100644 --- a/czsc/sensors/stocks.py +++ b/czsc/sensors/stocks.py @@ -4,199 +4,291 @@ email: zeng_bin8888@163.com create_dt: 2021/10/30 20:18 describe: A股市场感应器若干,主要作为编写感应器的示例 - -强势个股传感器 -强势板块传感器 -强势行业传感器 -大盘指数传感器 """ import os.path import traceback +import inspect from datetime import timedelta, datetime -from collections import OrderedDict - +from collections import OrderedDict, Counter import pandas as pd from tqdm import tqdm +from typing import Callable +from czsc.objects import Event +from czsc.data.ts_cache import TsDataCache, Freq +from czsc.sensors.utils import get_index_beta, generate_signals, max_draw_down +from czsc.utils import WordWriter -from ..utils import WordWriter -from ..data.ts_cache import TsDataCache -from ..signals import get_selector_signals -from ..objects import Operate, Signal, Factor, Event, Freq -from ..utils.kline_generator import KlineGeneratorD -from ..utils import io -from ..traders.daily import CzscDailyTrader - -class StrongStocksSensor: - """强势个股传感器 +class StocksDaySensor: + """以日线为基础周期的强势股票感应器 输入:市场个股全部行情、概念板块成分信息 输出:强势个股列表以及概念板块分布 """ - def __init__(self, dc: TsDataCache): + def __init__(self, + dc: TsDataCache, + get_signals: Callable, + get_event: Callable, + params: dict = None): + self.name = self.__class__.__name__ + self.version = "V20211117" self.data = OrderedDict() + self.get_signals = get_signals + self.get_event = get_event + self.event: Event = get_event() + self.base_freq = Freq.D.value + self.freqs = [Freq.W.value, Freq.M.value] + + if params: + self.params = params + else: + self.params = { + "validate_sdt": "20210101", + "validate_edt": "20211112", + "min_total_mv": 1e6, # 最小总市值,单位为万元,1e6万元 = 100亿 + "fc_top_n": 40, # 板块效应 - 选择出现数量最多的 top_n 概念 + 'fc_min_n': 4 # 单股票至少有 min_n 概念在 top_n 中 + } + self.dc = dc - self.strong_event = Event(name="选股", operate=Operate.LO, factors=[ - Factor(name="月线KDJ金叉_日线MACD强势", signals_all=[ - Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), - Signal('日线_MACD状态_任意_DIFF大于0_DEA大于0_柱子增大_0'), - Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), - ]), - - Factor(name="月线KDJ金叉_日线潜在三买", signals_all=[ - Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), - Signal('日线_倒0笔_潜在三买_构成中枢_近3K在中枢上沿附近_近7K突破中枢GG_0'), - Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), - ]), - - Factor( - name="月线KDJ金叉_周线三笔强势", - signals_all=[ - Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), - Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), - ], - signals_any=[ - Signal('周线_倒1笔_三笔形态_向下不重合_任意_任意_0'), - Signal('周线_倒1笔_三笔形态_向下奔走型_任意_任意_0'), - Signal('周线_倒1笔_三笔形态_向下盘背_任意_任意_0'), - ] - ), - - Factor(name="月线KDJ金叉_周线MACD强势", signals_all=[ - Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), - Signal('周线_MACD状态_任意_DIFF大于0_DEA大于0_柱子增大_0'), - Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), - ]), - - ]) - - def get_share_czsc_signals(self, ts_code: str, trade_date: datetime): - """获取 ts_code 在 trade_date 的信号字典""" - start_date = trade_date - timedelta(days=5000) - bars = self.dc.pro_bar(ts_code=ts_code, start_date=start_date, end_date=trade_date, - freq='D', asset="E", raw_bar=True) - assert bars[-1].dt.date() == trade_date.date() - kgd = KlineGeneratorD(freqs=[Freq.D.value, Freq.W.value, Freq.M.value]) - for bar in bars: - kgd.update(bar) - ct = CzscDailyTrader(kgd, get_selector_signals) - return dict(ct.s) - - def process_one_day(self, trade_date: [datetime, str]): - if isinstance(trade_date, str): - trade_date = pd.to_datetime(trade_date) + self.betas = ['000001.SH', '000016.SH', '000905.SH', '000300.SH', '399001.SZ', '399006.SZ'] + self.all_cache = dict() + self.res_cache = dict() + + self.sdt = self.params['validate_sdt'] + self.edt = self.params['validate_edt'] + def get_share_strong_days(self, ts_code: str, name: str): + """获取单个标的全部强势信号日期""" dc = self.dc - stocks = dc.stock_basic() - stocks = stocks[stocks.list_date <= (trade_date - timedelta(days=365)).strftime('%Y%m%d')] - records = stocks.to_dict('records') + event = self.event + sdt = self.sdt + edt = self.edt + + start_date = pd.to_datetime(self.sdt) - timedelta(days=3000) + bars = dc.pro_bar(ts_code=ts_code, start_date=start_date, end_date=edt, freq='D', asset="E", raw_bar=True) + df = dc.pro_bar(ts_code=ts_code, start_date=start_date, end_date=edt, freq='D', asset="E", raw_bar=False) + nb_dicts = {row['trade_date'].strftime("%Y%m%d"): row for row in df.to_dict("records")} - event = self.strong_event + signals = generate_signals(bars, sdt, self.base_freq, self.freqs, self.get_signals) results = [] - for row in tqdm(records, desc=f"{trade_date} selector"): - symbol = row['ts_code'] - try: - s = self.get_share_czsc_signals(symbol, trade_date) - m, f = event.is_match(s) - if m: - dt_fmt = "%Y%m%d" - res = { - 'symbol': symbol, - 'name': row['name'], - 'reason': f, - 'end_dt': trade_date.strftime(dt_fmt), - 'F10': f"http://basic.10jqka.com.cn/{symbol.split('.')[0]}", - 'Kline': f"https://finance.sina.com.cn/realstock/company/{symbol[-2:].lower()}{symbol[:6]}/nc.shtml" - } - results.append(res) - print(res) - except: - print("fail on {}".format(symbol)) - traceback.print_exc() - return results + for s in signals: + m, f = event.is_match(s) + if m: + res = { + 'ts_code': ts_code, + 'name': name, + 'reason': f, + } + nb_info = nb_dicts.get(s['dt'].strftime("%Y%m%d"), None) + if not nb_info: + print(f"not match nb info: {nb_info}") - def get_share_hist_signals(self, ts_code: str, trade_date: datetime): - """获取单个标的全部历史信号""" - file_pkl = os.path.join(self.dc.cache_path, f"{ts_code}_all_hist_signals.pkl") - if os.path.exists(file_pkl): - all_hist_signals = io.read_pkl(file_pkl) + res.update(nb_info) + results.append(res) + df_res = pd.DataFrame(results) + + if df_res.empty: + print(f"{ts_code} | {name} | empty") else: - start_date = pd.to_datetime(self.dc.sdt) - timedelta(days=1000) - bars = self.dc.pro_bar(ts_code=ts_code, start_date=start_date, end_date=self.dc.edt, - freq='D', asset="E", raw_bar=True) - kgd = KlineGeneratorD(freqs=[Freq.D.value, Freq.W.value, Freq.M.value]) - for bar in bars[:250]: - kgd.update(bar) - ct = CzscDailyTrader(kgd, get_selector_signals) - - all_hist_signals = {} - for bar in tqdm(bars[250:], desc=f"{ts_code} all hist signals"): - ct.update(bar) - all_hist_signals[bar.dt.strftime('%Y%m%d')] = dict(ct.s) - - io.save_pkl(all_hist_signals, file_pkl) - - return all_hist_signals.get(trade_date.strftime("%Y%m%d"), None) - - def get_share_hist_returns(self, ts_code: str, trade_date: datetime): - """获取单个标 trade_date 后的 n bar returns""" - df = self.dc.pro_bar(ts_code=ts_code, start_date=trade_date, end_date=trade_date, - freq='D', asset="E", raw_bar=False) - if df.empty: - return None - else: - assert len(df) == 1 - return df.iloc[0].to_dict() + df_ = dc.daily_basic(ts_code, sdt, dc.edt) + df_['trade_date'] = pd.to_datetime(df_['trade_date']) + df_res = df_res.merge(df_[['trade_date', 'total_mv']], on='trade_date', how='left') + df_res = df_res[df_res['total_mv'] >= self.params['min_total_mv']] + print(f"\n{ts_code} | {name} | {len(df_res)} | {df_res.n1b.mean()} | {int(df_res.n1b.sum())}") - def validate(self, sdt='20200101', edt='20201231'): - """验证传感器在一段时间内的表现 + self.all_cache[ts_code] = df_res - :param sdt: 开始时间 - :param edt: 结束时间 + def filter_shares_by_concepts(self, dfg, top_n=20, min_n=3): + """使用板块效应过滤某天的选股结果 + + :param dfg: 某一天的选股结果 + :param top_n: 选取前 n 个密集概念 + :param min_n: 单股票至少要有 n 个概念在 top_n 中 :return: """ - stocks = self.dc.stock_basic() - trade_cal = self.dc.trade_cal() - trade_cal = trade_cal[(trade_cal.cal_date >= sdt) & (trade_cal.cal_date <= edt) & trade_cal.is_open] + dc = self.dc + ths_members = dc.get_all_ths_members() + ths_members = ths_members[ths_members['概念类别'] == 'N'] + ths_members = ths_members[~ths_members['概念名称'].isin([ + 'MSCI概念', '沪股通', '深股通', '融资融券', '上证180成份股', '央企国资改革', + '标普道琼斯A股', '中证500成份股', '上证380成份股', '沪深300样本股', + ])] + + ths_concepts = ths_members[ths_members.code.isin(dfg.ts_code)] + all_concepts = ths_concepts['概念名称'].to_list() + key_concepts = [k for k, v in Counter(all_concepts).most_common(top_n)] + + sel = ths_concepts[ths_concepts['概念名称'].isin(key_concepts)] + ts_codes = [k for k, v in Counter(sel.code).most_common() if v >= min_n] + dfg = dfg[dfg.ts_code.isin(ts_codes)] + dfg['概念板块'] = dfg.ts_code.apply(lambda x: ths_concepts[ths_concepts.code == x]['概念名称'].to_list()) + dfg['概念数量'] = dfg['概念板块'].apply(len) + + return dfg, key_concepts + + def validate(self): + """验证传感器在一段时间内的表现""" + dc = self.dc + stocks = dc.stock_basic() + sdt = self.params['validate_sdt'] + edt = self.params['validate_edt'] + + for row in tqdm(stocks.to_dict('records'), desc="validate"): + ts_code = row['ts_code'] + name = row['name'] + try: + self.get_share_strong_days(ts_code, name) + except: + print(f"get_share_strong_days error: {ts_code}, {name}") + traceback.print_exc() + + res = [] + for ts_code, x in self.all_cache.items(): + if x.empty: + continue + x = x[pd.to_datetime(sdt) <= x['trade_date']] + x = x[x['trade_date'] <= pd.to_datetime(edt)] + if not x.empty: + res.append(x) + + df = pd.concat(res, ignore_index=True) + + trade_cal = dc.trade_cal() + trade_cal = trade_cal[trade_cal.is_open == 1] trade_dates = trade_cal.cal_date.to_list() - event = self.strong_event results = [] - for trade_date in trade_dates: - trade_date = pd.to_datetime(trade_date) - min_list_date = (trade_date - timedelta(days=365)).strftime('%Y%m%d') - rows = stocks[stocks.list_date <= min_list_date].to_dict('records') - - for row in tqdm(rows, desc=trade_date.strftime('%Y%m%d')): - ts_code = row['ts_code'] - try: - s = self.get_share_hist_signals(ts_code, trade_date) - if not s: - continue - - n = self.get_share_hist_returns(ts_code, trade_date) - m, f = event.is_match(s) - if m: - res = { - 'symbol': ts_code, - 'name': row['name'], - 'reason': f, - 'trade_date': trade_date.strftime("%Y%m%d"), - } - res.update(n) - results.append(res) - print(res) - except: - traceback.print_exc() - - df = pd.DataFrame(results) - df_m = df.groupby('trade_date').apply(lambda x: x[['n1b', 'n2b', 'n3b', 'n5b', 'n10b', 'n20b']].mean()) - return df, df_m - - def report(self, writer: WordWriter): + detail = [] + holds = [] + + for trade_date, dfg in df.groupby('trade_date'): + if dfg.empty: + print(f"{trade_date} 选股结果为空") + continue + + if self.params['fc_top_n'] > 0: + top_n = self.params['fc_top_n'] + min_n = self.params['fc_min_n'] + dfg, key_concepts = self.filter_shares_by_concepts(dfg, top_n=top_n, min_n=min_n) + else: + key_concepts = "" + + res = {'trade_date': trade_date, "key_concepts": key_concepts, 'number': len(dfg)} + res.update(dfg[['n1b', 'n2b', 'n3b', 'n5b', 'n10b', 'n20b']].mean().to_dict()) + results.append(res) + detail.append(dfg) + + # 构建持仓明显 + try: + hold = dfg.copy() + hold['成分日期'] = trade_dates[trade_dates.index(trade_date.strftime("%Y%m%d")) + 1] + hold['持仓权重'] = 0.98 / len(dfg) + hold.rename({'ts_code': "证券代码", "close": "交易价格"}, inplace=True, axis=1) + hold = hold[['证券代码', '持仓权重', '交易价格', '成分日期']] + hold['成分日期'] = pd.to_datetime(hold['成分日期']).apply(lambda x: x.strftime("%Y/%m/%d")) + holds.append(hold) + except: + print(f"fail on {trade_date}, {dfg}") + traceback.print_exc() + + df_detail = pd.concat(detail) + df_holds = pd.concat(holds, ignore_index=True) + df_merged = pd.DataFrame(results) + df_merged['trade_date'] = pd.to_datetime(df_merged['trade_date']) + return df_detail, df_merged, df_holds + + def get_index_beta(self): + """获取基准指数的Beta""" + dc = self.dc + sdt = self.params['validate_sdt'] + edt = self.params['validate_edt'] + + beta = {} + for ts_code in self.betas: + df = dc.pro_bar(ts_code=ts_code, start_date=sdt, end_date=edt, + freq='D', asset="I", raw_bar=False) + beta[ts_code] = df + return beta + + def report_performance(self, results_path, file_docx='股票选股强度验证.docx'): """撰写报告""" - raise NotImplementedError + dc = self.dc + sdt = self.sdt + edt = self.edt + + writer = WordWriter(file_docx) + if not os.path.exists(file_docx): + writer.add_title("股票选股强度验证") + + writer.add_page_break() + writer.add_heading(f"{datetime.now().strftime('%Y-%m-%d %H:%M')} {self.event.name}", level=1) + + writer.add_heading("参数配置", level=2) + writer.add_df_table(pd.DataFrame({"参数名称": list(self.params.keys()), '参数值': list(self.params.values())})) + writer.add_paragraph(f"测试方法描述:{self.event.name}") + writer.add_paragraph(f"信号计算函数:\n{inspect.getsource(self.get_signals)}") + writer.add_paragraph(f"事件具体描述:\n{inspect.getsource(self.get_event)}") + writer.save() + + writer.add_heading("测试结果", level=2) + + df_detail, df_merged, df_holds = self.validate() + beta = get_index_beta(dc, sdt, edt, self.betas) + + df_n1b = pd.DataFrame() + for name, df_ in beta.items(): + df_n1b['trade_date'] = pd.to_datetime(df_.trade_date.to_list()) + df_n1b[name] = df_.n1b.to_list() + + df_ = df_merged[['trade_date', 'number', 'n1b']] + df_.rename({'n1b': 'selector'}, axis=1, inplace=True) + df_.reset_index(drop=True, inplace=True) + df_n1b = df_n1b.merge(df_, on='trade_date', how='left') + df_n1b.fillna(0, inplace=True) + + os.makedirs(results_path, exist_ok=True) + df_detail.to_excel(os.path.join(results_path, f"选股结果_{sdt}_{edt}.xlsx"), index=False) + df_holds.to_excel(os.path.join(results_path, f"持仓明细_{sdt}_{edt}.xlsx"), index=False) + df_merged.to_excel(os.path.join(results_path, f"每日统计_{sdt}_{edt}.xlsx"), index=False) + df_n1b.to_excel(os.path.join(results_path, f"资金曲线_{sdt}_{edt}.xlsx"), index=False) + + f = pd.ExcelWriter(os.path.join(results_path, f"基准曲线_{sdt}_{edt}.xlsx")) + for name, df_ in beta.items(): + df_.to_excel(f, index=False, sheet_name=name) + f.close() + + mdd_info = {} + for col in self.betas + ['selector']: + df_n1b[f"{col}_curve"] = df_n1b[col].cumsum() + df_n1b[f"{col}_curve"] += 10000 + start_i, end_i, mdd = max_draw_down(df_n1b[col].to_list()) + + start_dt = df_n1b.iloc[start_i]['trade_date'] + end_dt = df_n1b.iloc[end_i]['trade_date'] + msg = f"{col} mdd: {start_dt.strftime('%Y%m%d')} - {end_dt.strftime('%Y%m%d')} - {mdd};nv = {int(df_n1b[col].sum())}" + print(msg) + writer.add_paragraph(msg) + mdd_info[col] = { + "start_date": start_dt, + "end_date": end_dt, + "mdd": mdd, + } + + df_n1b.to_excel(os.path.join(results_path, f"资金曲线_{sdt}_{edt}.xlsx"), index=False) + writer.save() + + self.res_cache = { + "detail": df_detail, + "holds": df_holds, + "merged": df_merged, + "curve": df_n1b, + "beta": beta + } + return df_holds + + diff --git a/czsc/sensors/utils.py b/czsc/sensors/utils.py new file mode 100644 index 000000000..edd8d4258 --- /dev/null +++ b/czsc/sensors/utils.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2021/11/17 18:50 +""" +import warnings +import pandas as pd +import numpy as np +from tqdm import tqdm +from typing import Callable, List, AnyStr +from czsc.traders.advanced import CzscAdvancedTrader, BarGenerator, RawBar +from czsc.data.ts_cache import TsDataCache + + +def get_index_beta(dc: TsDataCache, sdt: str, edt: str, indices=None): + """获取基准指数的Beta + + :param dc: Tushare 数据缓存对象 + :param sdt: 开始日期 + :param edt: 结束日期 + :param indices: 基准指数列表 + :return: beta + """ + if not indices: + indices = ['000001.SH', '000016.SH', '000905.SH', '000300.SH', '399001.SZ', '399006.SZ'] + + beta = {} + for ts_code in indices: + df = dc.pro_bar(ts_code=ts_code, start_date=sdt, end_date=edt, freq='D', asset="I", raw_bar=False) + beta[ts_code] = df + return beta + + +def generate_signals(bars: List[RawBar], sdt: AnyStr, base_freq: AnyStr, freqs: List[AnyStr], get_signals: Callable): + """获取历史信号 + + :param bars: 日线 + :param sdt: 信号计算开始时间 + :param base_freq: 合成K线的基础周期 + :param freqs: K线周期列表 + :param get_signals: 单级别信号计算函数 + :return: signals + """ + sdt = pd.to_datetime(sdt) + bars_left = [x for x in bars if x.dt < sdt] + if len(bars_left) <= 500: + bars_left = bars[:500] + bars_right = bars[500:] + else: + bars_right = [x for x in bars if x.dt >= sdt] + + if len(bars_right) == 0: + warnings.warn("右侧K线为空,无法进行信号生成", category=RuntimeWarning) + return [] + + bg = BarGenerator(base_freq=base_freq, freqs=freqs, max_count=5000) + for bar in bars_left: + bg.update(bar) + + signals = [] + ct = CzscAdvancedTrader(bg, get_signals) + for bar in tqdm(bars_right, desc=f'generate signals of {bg.symbol}'): + ct.update(bar) + signals.append(dict(ct.s)) + return signals + + +def max_draw_down(n1b: List): + """最大回撤 + + 参考:https://blog.csdn.net/weixin_38997425/article/details/82915386 + + :param n1b: 逐个结算周期的收益列表,单位:BP,换算关系是 10000BP = 100% + 如,n1b = [100.1, -90.5, 212.6],表示第一个结算周期收益为100.1BP,也就是1.001%,以此类推。 + :return: 最大回撤起止位置和最大回撤 + """ + curve = np.cumsum(n1b) + # 获取结束位置 + i = np.argmax((np.maximum.accumulate(curve) - curve) / np.maximum.accumulate(curve)) + if i == 0: + return 0, 0, 0 + + # 获取开始位置 + j = np.argmax(curve[:i]) + mdd = int((curve[j] - curve[i]) / curve[j] * 10000) / 10000 + return j, i, mdd diff --git a/czsc/traders/advanced.py b/czsc/traders/advanced.py index 05cd855e4..729bd692e 100644 --- a/czsc/traders/advanced.py +++ b/czsc/traders/advanced.py @@ -15,31 +15,29 @@ from ..analyze import CZSC from ..objects import PositionLong, Operate, Event, RawBar -from ..utils import KlineGenerator +from ..utils import BarGenerator from ..utils.cache import home_path class CzscAdvancedTrader: - """缠中说禅技术分析理论之多级别联立交易决策类(支持分批开平仓)""" + """缠中说禅技术分析理论之多级别联立交易决策类(支持分批开平仓 / 支持从任意周期开始交易)""" - def __init__(self, kg: KlineGenerator, get_signals: Callable, + def __init__(self, bg: BarGenerator, get_signals: Callable, long_events: List[Event] = None, long_pos: PositionLong = None): """ - :param kg: K线合成器 + :param bg: K线合成器 :param get_signals: 自定义的单级别信号计算函数 :param long_events: 自定义的多头交易事件组合,推荐平仓事件放到前面 :param long_pos: 多头仓位对象 """ self.name = "CzscAdvancedTrader" - self.kg = kg - self.freqs = kg.freqs + self.bg = bg + self.base_freq = bg.base_freq + self.freqs = list(bg.bars.keys()) self.long_events = long_events self.long_pos = long_pos - - klines = self.kg.get_klines({k: 3000 for k in self.freqs}) - self.kas = {k: CZSC(klines[k], max_bi_count=50, get_signals=get_signals) for k in klines.keys()} - + self.kas = {freq: CZSC(b, max_bi_count=50, get_signals=get_signals) for freq, b in bg.bars.items()} self.s = self._cal_signals() def __repr__(self): @@ -86,30 +84,28 @@ def open_in_browser(self, width="1400px", height="580px"): def _cal_signals(self): """计算信号""" - self.symbol = self.kas["1分钟"].symbol - self.end_dt = self.kas["1分钟"].bars_raw[-1].dt - self.latest_price = self.kas["1分钟"].bars_raw[-1].close + base_freq = self.base_freq + self.symbol = self.kas[base_freq].symbol + self.end_dt = self.kas[base_freq].bars_raw[-1].dt + self.bid = self.kas[base_freq].bars_raw[-1].id + self.latest_price = self.kas[base_freq].bars_raw[-1].close s = OrderedDict() for freq, ks in self.kas.items(): s.update(ks.signals) - s.update(self.kas['1分钟'].bars_raw[-1].__dict__) + s.update(self.kas[base_freq].bars_raw[-1].__dict__) return s def update(self, bar: RawBar): """输入1分钟K线,更新信号,更新仓位""" - self.kg.update(bar) - klines_one = self.kg.get_klines({freq: 1 for freq in self.freqs}) - - for freq, klines_ in klines_one.items(): - self.kas[freq].update(klines_[-1]) - + self.bg.update(bar) + for freq, b in self.bg.bars.items(): + self.kas[freq].update(b[-1]) self.s = self._cal_signals() - dt = self.end_dt price = self.latest_price - bid = self.kg.m1[-1].id + bid = self.bid # 遍历 long_events,更新 long_pos if self.long_events: diff --git a/czsc/traders/daily.py b/czsc/traders/daily.py index f419a0a8c..826d6ace4 100644 --- a/czsc/traders/daily.py +++ b/czsc/traders/daily.py @@ -15,13 +15,14 @@ from ..analyze import CZSC, Freq, Event, RawBar from ..utils.kline_generator import KlineGeneratorD +from ..utils.bar_generator import BarGenerator from ..utils.cache import home_path class CzscDailyTrader: """缠中说禅技术分析理论之日线多级别联立交易决策类""" - def __init__(self, kg: KlineGeneratorD, get_signals: Callable, events: List[Event] = None): + def __init__(self, kg: [KlineGeneratorD, BarGenerator], get_signals: Callable, events: List[Event] = None): """ :param kg: K线合成器 @@ -32,9 +33,7 @@ def __init__(self, kg: KlineGeneratorD, get_signals: Callable, events: List[Even self.kg = kg self.freqs = kg.freqs self.events = events - - klines = {freq: self.kg.bars[freq][-1000:] for freq in self.freqs} - self.kas = {k: CZSC(klines[k], max_bi_count=50, get_signals=get_signals) for k in klines.keys()} + self.kas = {k: CZSC(b[-1000:], max_bi_count=50, get_signals=get_signals) for k, b in self.kg.bars.items()} self.s = self._cal_signals() def __repr__(self): @@ -101,9 +100,7 @@ def update(self, bar: RawBar): """ assert bar.freq == Freq.D self.kg.update(bar) - klines_one = {freq: self.kg.bars[freq][-1] for freq in self.freqs} - - for freq, bar in klines_one.items(): - self.kas[freq].update(bar) + for freq, bar in self.kg.bars.items(): + self.kas[freq].update(bar[-1]) self.s = self._cal_signals() diff --git a/czsc/utils/kline_generator.py b/czsc/utils/kline_generator.py index f1e488542..3b1a093f4 100644 --- a/czsc/utils/kline_generator.py +++ b/czsc/utils/kline_generator.py @@ -1,4 +1,5 @@ # coding: utf-8 +from deprecated import deprecated from datetime import datetime, timedelta from typing import List, Union from ..enum import Freq @@ -103,6 +104,7 @@ def freq_end_time(dt: datetime, freq: Freq) -> datetime: return dt +@deprecated(reason="请使用 BarGenerator,性能更高,且支持从任意周期开始合成", version='1.0.0') class KlineGenerator: """K线生成器,仿实盘""" @@ -309,7 +311,7 @@ def get_klines(self, counts=None): counts = {"1分钟": 1000, "5分钟": 1000, "30分钟": 1000, "日线": 100} return {k: self.get_kline(k, v) for k, v in counts.items()} - +@deprecated(reason="请使用 BarGenerator,性能更高,且支持从任意周期开始合成", version='1.0.0') class KlineGeneratorD: """使用日线合成周线、月线、季线""" def __init__(self, freqs: List[str] = None): diff --git a/docs/README.md b/docs/README.md index 11e4718af..82ee2545f 100644 --- a/docs/README.md +++ b/docs/README.md @@ -1,9 +1,9 @@ # 文档 -## 对象设计 +## 参考资料 -```python +* [前复权、后复权、不复权价格区别与计算](https://liguoqinjim.cn/post/quant/fq_price/) +* [使用飞书创建自己的通知机器人](https://liguoqinjim.cn/post/tool/%E4%BD%BF%E7%94%A8%E9%A3%9E%E4%B9%A6%E5%88%9B%E5%BB%BA%E8%87%AA%E5%B7%B1%E7%9A%84%E9%80%9A%E7%9F%A5%E6%9C%BA%E5%99%A8%E4%BA%BA/) -``` diff --git "a/docs/\345\215\225\345\233\240\345\255\220\345\210\206\346\236\220\345\267\245\345\205\267\345\274\200\345\217\221\346\226\207\346\241\243.md" "b/docs/\345\215\225\345\233\240\345\255\220\345\210\206\346\236\220\345\267\245\345\205\267\345\274\200\345\217\221\346\226\207\346\241\243.md" deleted file mode 100644 index 547ec0fc9..000000000 --- "a/docs/\345\215\225\345\233\240\345\255\220\345\210\206\346\236\220\345\267\245\345\205\267\345\274\200\345\217\221\346\226\207\346\241\243.md" +++ /dev/null @@ -1,36 +0,0 @@ -## 单因子分析工具开发文档 - -单因子分析工具的主要功能是,输入一组计算好的因子,分析这些因子的历史表现。 - -1. 纵向分析因子在某个标的上的历史表现; -2. 横向分析因子在某个时点的全市场表现。 - -### 评价方法 - ---- - -评价方法:1)N周期区间百分位;2)N周期绝对收益 - -#### 评价方法一:N周期区间百分位 ->假设选股策略确认选股时间为 T,股价为 P,T+N 区间内的K线最高价为 MAX_P,最低价为 MIN_P,则 -> ->N周期区间百分位 = (MAX_P - P) / (MAX_P - MIN_P) * 100; -> ->N周期区间百分位值域为(0, 100),值越大,说明离区间最大值越远,对于做多交易而言有更高的安全性。 - -#### 评价方法二:N周期绝对收益 ->假设选股策略确认选股时间为 T,股价为 P,第T+N 根K线收盘价为 C,则 -> ->N周期绝对收益 = (C - P) / P * 100; -> ->N周期绝对收益值域为(-100, +inf),值越大,说明绝对收益越大,对于做多交易而言有更高的收益率。 - -### 因子输入标准 - - - - - - - - diff --git a/examples/run_stocks_sensors.py b/examples/run_stocks_sensors.py index fe11347dd..7ad6c432f 100644 --- a/examples/run_stocks_sensors.py +++ b/examples/run_stocks_sensors.py @@ -8,17 +8,67 @@ sys.path.insert(0, '.') sys.path.insert(0, '..') -import pandas as pd from czsc.data.ts_cache import TsDataCache -from czsc.sensors.stocks import StrongStocksSensor +from czsc.sensors.stocks import StocksDaySensor +from czsc.objects import Operate, Signal, Factor, Event +from czsc.signals import get_selector_signals + dc = TsDataCache(data_path=r'C:\ts_data', sdt='2019-01-01', edt='20211029') +def get_event(): + event = Event(name="选股", operate=Operate.LO, factors=[ + Factor(name="月线KDJ金叉_日线MACD强势", signals_all=[ + Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), + Signal('日线_MACD状态_任意_DIFF大于0_DEA大于0_柱子增大_0'), + Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), + ]), + + Factor(name="月线KDJ金叉_日线潜在三买", signals_all=[ + Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), + Signal('日线_倒0笔_潜在三买_构成中枢_近3K在中枢上沿附近_近7K突破中枢GG_0'), + Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), + ]), + + Factor( + name="月线KDJ金叉_周线三笔强势", + signals_all=[ + Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), + Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), + ], + signals_any=[ + Signal('周线_倒1笔_三笔形态_向下不重合_任意_任意_0'), + Signal('周线_倒1笔_三笔形态_向下奔走型_任意_任意_0'), + Signal('周线_倒1笔_三笔形态_向下盘背_任意_任意_0'), + ] + ), + + Factor(name="月线KDJ金叉_周线MACD强势", signals_all=[ + Signal("月线_KDJ状态_任意_金叉_任意_任意_0"), + Signal('周线_MACD状态_任意_DIFF大于0_DEA大于0_柱子增大_0'), + Signal('日线_MA5状态_任意_收盘价在MA5上方_任意_任意_0'), + ]), + + ]) + return event + + if __name__ == '__main__': - sss = StrongStocksSensor(dc) - results = sss.validate(sdt='20200101', edt='20201231') + params = { + "validate_sdt": "20210101", + "validate_edt": "20211114", + "min_total_mv": 5e5, # 最小总市值,单位为万元,1e6万元 = 100亿 + "fc_top_n": 30, # 板块效应 - 选择出现数量最多的 top_n 概念 + 'fc_min_n': 2 # 单股票至少有 min_n 概念在 top_n 中 + } + sds = StocksDaySensor(dc, get_selector_signals, get_event, params) + + results_path = fr"C:\ZB\data\strong_stocks_selector\{sds.event.name}_{sds.sdt}_{sds.edt}" + file_docx = r"C:\ZB\data\strong_stocks_selector\股票选股强度验证.docx" + df_holds = sds.report_performance(results_path, file_docx) + + # 获取最后一个交易日的持仓明细 + # max_date = df_holds.成分日期.max() + # df_last_holds = df_holds[df_holds['成分日期'] == max_date] + # df_last_holds.to_excel(f"{sss.event.name}_{max_date.replace('/', '')}_持仓明细.xlsx", index=False) - # res = sss.process_one_day(trade_date='20211029') - # - # s = sss.get_share_hist_signals(ts_code='000001.SZ', trade_date=pd.to_datetime('20211029')) - # n = sss.get_share_hist_returns(ts_code='000001.SZ', trade_date=pd.to_datetime('20211029')) diff --git a/requirements.txt b/requirements.txt index 5b8caefec..828be03df 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,4 +9,5 @@ tushare python-docx>=0.8.11 transitions matplotlib -seaborn \ No newline at end of file +seaborn +Deprecated>=1.2.12 \ No newline at end of file diff --git a/test/test_advanced_trader.py b/test/test_advanced_trader.py index 17b5fe0a5..09df62b58 100644 --- a/test/test_advanced_trader.py +++ b/test/test_advanced_trader.py @@ -8,9 +8,9 @@ from tqdm import tqdm from czsc.traders.advanced import CzscAdvancedTrader from czsc.signals import * -from czsc.utils import KlineGenerator +from czsc.utils import BarGenerator from czsc.objects import Signal, Factor, Event, Operate, PositionLong -from test.test_analyze import read_1min +from test.test_analyze import read_1min, read_daily, get_user_signals def get_default_signals(c: analyze.CZSC) -> OrderedDict: """在 CZSC 对象上计算信号,这个是标准函数,主要用于研究。 @@ -21,21 +21,29 @@ def get_default_signals(c: analyze.CZSC) -> OrderedDict: :return: 信号字典 """ s = OrderedDict({"symbol": c.symbol, "dt": c.bars_raw[-1].dt, "close": c.bars_raw[-1].close}) - - # for di in range(1, 2): - # s.update(get_s_three_bi(c, di)) - # - # for di in range(1, 2): - # s.update(get_s_base_xt(c, di)) - for di in range(1, 2): s.update(get_s_like_bs(c, di)) return s +def test_daily_trader(): + bars = read_daily() + kg = BarGenerator(base_freq='日线', freqs=['周线', '月线']) + for bar in bars[:1000]: + kg.update(bar) + + ct = CzscAdvancedTrader(kg, get_user_signals) + + signals = [] + for bar in bars[1000:]: + ct.update(bar) + signals.append(dict(ct.s)) + + assert len(signals) == 2332 + def test_advanced_trader_with_t0(): bars = read_1min() - kg = KlineGenerator(max_count=3000, freqs=['1分钟', '5分钟', '15分钟', '30分钟', '60分钟', '日线']) + kg = BarGenerator(base_freq='1分钟', freqs=['5分钟', '15分钟', '30分钟', '60分钟', '日线'], max_count=3000) for row in tqdm(bars[:150000], desc='init kg'): kg.update(row) @@ -61,7 +69,7 @@ def test_advanced_trader_with_t0(): ]), ] long_pos = PositionLong(symbol='000001.SH', hold_long_a=0.5, hold_long_b=0.8, hold_long_c=1, T0=True) - ct = CzscAdvancedTrader(kg=kg, get_signals=get_default_signals, long_events=events, long_pos=long_pos) + ct = CzscAdvancedTrader(bg=kg, get_signals=get_default_signals, long_events=events, long_pos=long_pos) assert len(ct.s) == 16 for row in tqdm(bars[150000:], desc="trade"): ct.update(row) @@ -78,7 +86,7 @@ def test_advanced_trader_with_t0(): def test_advanced_trader_without_t0(): bars = read_1min() - kg = KlineGenerator(max_count=3000, freqs=['1分钟', '5分钟', '15分钟', '30分钟', '60分钟', '日线']) + kg = BarGenerator(base_freq='1分钟', freqs=['5分钟', '15分钟', '30分钟', '60分钟', '日线'], max_count=3000) for row in tqdm(bars[:150000], desc='init kg'): kg.update(row) @@ -104,7 +112,7 @@ def test_advanced_trader_without_t0(): ]), ] long_pos = PositionLong(symbol='000001.SH', hold_long_a=0.5, hold_long_b=0.8, hold_long_c=1, T0=False) - ct = CzscAdvancedTrader(kg=kg, get_signals=get_default_signals, long_events=events, long_pos=long_pos) + ct = CzscAdvancedTrader(bg=kg, get_signals=get_default_signals, long_events=events, long_pos=long_pos) assert len(ct.s) == 16 for row in tqdm(bars[150000:], desc="trade"): ct.update(row) diff --git a/test/test_cobra_backtest.py b/test/test_cobra_backtest.py index e62ac10dc..6c26455a1 100644 --- a/test/test_cobra_backtest.py +++ b/test/test_cobra_backtest.py @@ -30,11 +30,3 @@ def test_cobra_backtest(): assert len(pairs) == 65 assert pf['累计收益(%)'] > pf['基准收益(%)'] - event = Event(name="多头过滤", operate=Operate.LO, factors=[ - Factor(name="多头过滤", signals_all=[ - Signal("30分钟_倒1K_DIF多空_多头_任意_任意_0"), - ]), - ]) - pairs, pf = bt.one_event_estimator(signals, event) - assert len(pairs) == 8 - assert pf['累计收益(%)'] > pf['基准收益(%)'] diff --git a/test/test_cobra_utils.py b/test/test_cobra_utils.py index 2cb1cc56b..5b4eda0ad 100644 --- a/test/test_cobra_utils.py +++ b/test/test_cobra_utils.py @@ -1,14 +1,11 @@ # coding: utf-8 -import sys import warnings import os import numpy as np import pandas as pd -from copy import deepcopy import czsc from czsc.cobra.utils import down_cross_count, kdj_gold_cross, kdj_dead_cross -from czsc.cobra.utils import drop_duplicates_by_window from czsc.objects import RawBar from czsc.enum import Freq @@ -20,7 +17,6 @@ def test_kdj_cross(): file_kline = os.path.join(cur_path, "data/000001.SH_D.csv") kline = pd.read_csv(file_kline, encoding="utf-8") - # bars = kline.to_dict("records") bars = [RawBar(symbol=row['symbol'], id=i, freq=Freq.D, open=row['open'], dt=row['dt'], close=row['close'], high=row['high'], low=row['low'], vol=row['vol']) for i, row in kline.iterrows()] @@ -39,26 +35,3 @@ def test_cross_count(): assert down_cross_count(np.array(x1), np.array(x2)) == 2 assert down_cross_count(x2, x1) == 2 assert down_cross_count(np.array(x2), np.array(x1)) == 2 - - -def test_drop_duplicates(): - seq1 = [1, 2, 3, 3, 2, 4, 5, 6, 8, 1] - seq1_ = drop_duplicates_by_window(seq1, -1, window_size=5) - assert seq1_[3] == -1 - assert seq1_[4] == -1 - - seq2 = ['x1', 'x1', 'x2', 'x3', 'x4', 'x3', 'x2', 'x5', 'x6'] - seq2_ = drop_duplicates_by_window(deepcopy(seq2), 'other', window_size=5) - assert seq2_[1] == seq2_[5] == seq2_[6] == 'other' - seq2_ = drop_duplicates_by_window(deepcopy(seq2), 'other', window_size=3) - assert seq2_[1] == seq2_[5] == 'other' - - seq3 = [0.1, 1.2, 2.3, 3.2, 1.2, 2.5, 0.1, 2.3] - seq3_ = drop_duplicates_by_window(deepcopy(seq3), -0.1, window_size=5) - assert seq3_[4] == -0.1 - seq3_ = drop_duplicates_by_window(deepcopy(seq3), -0.1, window_size=10) - assert seq3_[4] == seq3_[6] == seq3_[7] == -0.1 - - # 长序列性能:500 ms ± 3.14 ms per loop (mean ± std. dev. of 10 runs, 1 loop each) - # seq = list(range(2000000)) - # %timeit -r 10 seq_ = drop_duplicates_by_window(seq, -1, window_size=5) diff --git a/test/test_traders_daily.py b/test/test_traders_daily.py index 33e4723c7..93e0639e5 100644 --- a/test/test_traders_daily.py +++ b/test/test_traders_daily.py @@ -4,12 +4,14 @@ email: zeng_bin8888@163.com create_dt: 2021/11/1 23:22 """ +from czsc.utils.bar_generator import BarGenerator from czsc.traders.daily import CzscDailyTrader, KlineGeneratorD from test.test_analyze import read_daily, get_user_signals def test_daily_trader(): bars = read_daily() + kg = KlineGeneratorD(freqs=['日线', '周线', '月线']) for bar in bars[:1000]: kg.update(bar) @@ -23,3 +25,16 @@ def test_daily_trader(): assert len(signals) == 2332 + kg = BarGenerator(base_freq='日线', freqs=['周线', '月线']) + for bar in bars[:1000]: + kg.update(bar) + + ct = CzscDailyTrader(kg, get_user_signals) + + signals = [] + for bar in bars[1000:]: + ct.update(bar) + signals.append(dict(ct.s)) + + assert len(signals) == 2332 +