diff --git a/.github/workflows/pythonpackage.yml b/.github/workflows/pythonpackage.yml index f6ca04765..5f5f63a04 100644 --- a/.github/workflows/pythonpackage.yml +++ b/.github/workflows/pythonpackage.yml @@ -5,7 +5,7 @@ name: Python package on: push: - branches: [ master, V0.9.46 ] + branches: [ master, V0.9.47 ] pull_request: branches: [ master ] diff --git a/czsc/__init__.py b/czsc/__init__.py index 436f9c744..ef245686e 100644 --- a/czsc/__init__.py +++ b/czsc/__init__.py @@ -88,6 +88,7 @@ home_path, DiskCache, disk_cache, + clear_cache, get_dir_size, empty_cache_path, print_df_sample, @@ -154,12 +155,13 @@ rolling_slope, rolling_tanh, feature_adjust, + normalize_corr, ) -__version__ = "0.9.46" +__version__ = "0.9.47" __author__ = "zengbin93" __email__ = "zeng_bin8888@163.com" -__date__ = "20240318" +__date__ = "20240328" def welcome(): diff --git a/czsc/cmd.py b/czsc/cmd.py index 076531dac..fc231a099 100644 --- a/czsc/cmd.py +++ b/czsc/cmd.py @@ -17,8 +17,7 @@ def czsc(): @czsc.command() -def aphorism(): +def a(): """随机输出一条缠中说禅良言警句""" from czsc.aphorism import print_one - print_one() diff --git a/czsc/connectors/cooperation.py b/czsc/connectors/cooperation.py index 79b56247d..027392795 100644 --- a/czsc/connectors/cooperation.py +++ b/czsc/connectors/cooperation.py @@ -56,46 +56,52 @@ def get_symbols(name, **kwargs): :return: """ if name == "股票": - df = dc.stock_basic(nobj=1, status=1) + df = dc.stock_basic(nobj=1, status=1, ttl=3600 * 6) symbols = [f"{row['code']}#STOCK" for _, row in df.iterrows()] return symbols if name == "ETF": - df = dc.etf_basic(v=2, fields='code,name') - dfk = dc.pro_bar(trade_date="2023-11-17", asset="e", v=2) + df = dc.etf_basic(v=2, fields='code,name', ttl=3600 * 6) + dfk = dc.pro_bar(trade_date="2024-04-02", asset="e", v=2) df = df[df['code'].isin(dfk['code'])].reset_index(drop=True) symbols = [f"{row['code']}#ETF" for _, row in df.iterrows()] return symbols if name == "A股指数": # 指数 https://s0cqcxuy3p.feishu.cn/wiki/KuSAweAAhicvsGk9VPTc1ZWKnAd - df = dc.index_basic(v=2, market='SSE,SZSE') + df = dc.index_basic(v=2, market='SSE,SZSE', ttl=3600 * 6) symbols = [f"{row['code']}#INDEX" for _, row in df.iterrows()] return symbols if name == "南华指数": - df = dc.index_basic(v=2, market='NH') + df = dc.index_basic(v=2, market='NH', ttl=3600 * 6) symbols = [row['code'] for _, row in df.iterrows()] return symbols if name == "期货主力": - kline = dc.future_klines(trade_date="20231101") + kline = dc.future_klines(trade_date="20240402", ttl=3600 * 6) return kline['code'].unique().tolist() + if name.upper() == "ALL": + symbols = get_symbols("股票") + get_symbols("ETF") + symbols += get_symbols("A股指数") + get_symbols("南华指数") + get_symbols("期货主力") + return symbols + raise ValueError(f"{name} 分组无法识别,获取标的列表失败!") def get_min_future_klines(code, sdt, edt, freq='1m'): """分段获取期货1分钟K线后合并""" # dates = pd.date_range(start=sdt, end=edt, freq='1M') - dates = pd.date_range(start=sdt, end=edt, freq='30D') + dates = pd.date_range(start=sdt, end=edt, freq='120D') dates = [d.strftime('%Y%m%d') for d in dates] + [sdt, edt] dates = sorted(list(set(dates))) rows = [] for sdt_, edt_ in tqdm(zip(dates[:-1], dates[1:]), total=len(dates) - 1): - df = dc.future_klines(code=code, sdt=sdt_, edt=edt_, freq=freq) + ttl = 60 if pd.to_datetime(edt_).date() == datetime.now().date() else -1 + df = dc.future_klines(code=code, sdt=sdt_, edt=edt_, freq=freq, ttl=ttl) if df.empty: continue logger.info(f"{code}获取K线范围:{df['dt'].min()} - {df['dt'].max()}") @@ -104,8 +110,19 @@ def get_min_future_klines(code, sdt, edt, freq='1m'): df = pd.concat(rows, ignore_index=True) df.rename(columns={'code': 'symbol'}, inplace=True) df['dt'] = pd.to_datetime(df['dt']) - df = df.drop_duplicates(subset=['dt', 'symbol'], keep='last') + + if code in ['SFIC9001', 'SFIF9001', 'SFIH9001']: + # 股指:仅保留 09:31 - 11:30, 13:01 - 15:00 + dt1 = datetime.strptime("09:31:00", "%H:%M:%S") + dt2 = datetime.strptime("11:30:00", "%H:%M:%S") + c1 = (df['dt'].dt.time >= dt1.time()) & (df['dt'].dt.time <= dt2.time()) + + dt3 = datetime.strptime("13:01:00", "%H:%M:%S") + dt4 = datetime.strptime("15:00:00", "%H:%M:%S") + c2 = (df['dt'].dt.time >= dt3.time()) & (df['dt'].dt.time <= dt4.time()) + + df = df[c1 | c2].copy().reset_index(drop=True) return df @@ -119,10 +136,14 @@ def get_raw_bars(symbol, freq, sdt, edt, fq='前复权', **kwargs): :param edt: 结束时间 :param fq: 除权类型,可选值:'前复权', '后复权', '不复权' :param kwargs: - :return: + :return: RawBar 对象列表 or DataFrame + + >>> from czsc.connectors import cooperation as coo + >>> df = coo.get_raw_bars(symbol="000001.SH#INDEX", freq="日线", sdt="2001-01-01", edt="2021-12-31", fq='后复权', raw_bars=False) """ freq = czsc.Freq(freq) raw_bars = kwargs.get('raw_bars', True) + ttl = kwargs.get('ttl', -1) if "SH" in symbol or "SZ" in symbol: fq_map = {"前复权": "qfq", "后复权": "hfq", "不复权": None} @@ -131,14 +152,17 @@ def get_raw_bars(symbol, freq, sdt, edt, fq='前复权', **kwargs): code, asset = symbol.split("#") if freq.value.endswith('分钟'): - df = dc.pro_bar(code=code, sdt=sdt, edt=edt, freq='min', adj=adj, asset=asset[0].lower(), v=2) + df = dc.pro_bar(code=code, sdt=sdt, edt=edt, freq='min', adj=adj, asset=asset[0].lower(), v=2, ttl=ttl) df = df[~df['dt'].str.endswith("09:30:00")].reset_index(drop=True) - else: - df = dc.pro_bar(code=code, sdt=sdt, edt=edt, freq='day', adj=adj, asset=asset[0].lower(), v=2) + df.rename(columns={'code': 'symbol'}, inplace=True) + df['dt'] = pd.to_datetime(df['dt']) + return czsc.resample_bars(df, target_freq=freq, raw_bars=raw_bars, base_freq='1分钟') - df.rename(columns={'code': 'symbol'}, inplace=True) - df['dt'] = pd.to_datetime(df['dt']) - return czsc.resample_bars(df, target_freq=freq, raw_bars=raw_bars) + else: + df = dc.pro_bar(code=code, sdt=sdt, edt=edt, freq='day', adj=adj, asset=asset[0].lower(), v=2, ttl=ttl) + df.rename(columns={'code': 'symbol'}, inplace=True) + df['dt'] = pd.to_datetime(df['dt']) + return czsc.resample_bars(df, target_freq=freq, raw_bars=raw_bars) if symbol.endswith("9001"): # https://s0cqcxuy3p.feishu.cn/wiki/WLGQwJLWQiWPCZkPV7Xc3L1engg @@ -148,19 +172,23 @@ def get_raw_bars(symbol, freq, sdt, edt, fq='前复权', **kwargs): freq_rd = '1m' if freq.value.endswith('分钟') else '1d' if freq.value.endswith('分钟'): df = get_min_future_klines(code=symbol, sdt=sdt, edt=edt, freq='1m') + df['amount'] = df['vol'] * df['close'] + df = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol', 'amount']].copy().reset_index(drop=True) + df['dt'] = pd.to_datetime(df['dt']) + return czsc.resample_bars(df, target_freq=freq, raw_bars=raw_bars, base_freq='1分钟') + else: - df = dc.future_klines(code=symbol, sdt=sdt, edt=edt, freq=freq_rd) + df = dc.future_klines(code=symbol, sdt=sdt, edt=edt, freq=freq_rd, ttl=ttl) df.rename(columns={'code': 'symbol'}, inplace=True) - - df['amount'] = df['vol'] * df['close'] - df = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol', 'amount']].copy().reset_index(drop=True) - df['dt'] = pd.to_datetime(df['dt']) - return czsc.resample_bars(df, target_freq=freq, raw_bars=raw_bars) + df['amount'] = df['vol'] * df['close'] + df = df[['symbol', 'dt', 'open', 'close', 'high', 'low', 'vol', 'amount']].copy().reset_index(drop=True) + df['dt'] = pd.to_datetime(df['dt']) + return czsc.resample_bars(df, target_freq=freq, raw_bars=raw_bars) if symbol.endswith(".NH"): if freq != Freq.D: raise ValueError("南华指数只支持日线数据") - df = dc.nh_daily(code=symbol, sdt=sdt, edt=edt) + df = dc.nh_daily(code=symbol, sdt=sdt, edt=edt, ttl=ttl) raise ValueError(f"symbol {symbol} 无法识别,获取数据失败!") diff --git a/czsc/connectors/tq_connector.py b/czsc/connectors/tq_connector.py index c415abe01..5ee95b304 100644 --- a/czsc/connectors/tq_connector.py +++ b/czsc/connectors/tq_connector.py @@ -183,26 +183,6 @@ def format_kline(df, freq=Freq.F1): } -def is_trade_time(trade_time: Optional[str] = None): - """判断当前是否是交易时间""" - if trade_time is None: - trade_time = datetime.now().strftime("%H:%M:%S") - - if trade_time >= "09:00:00" and trade_time <= "11:30:00": - return True - - if trade_time >= "13:00:00" and trade_time <= "15:00:00": - return True - - if trade_time >= "21:00:00" and trade_time <= "23:59:59": - return True - - if trade_time >= "00:00:00" and trade_time <= "02:30:00": - return True - - return False - - def get_daily_backup(api: TqApi, **kwargs): """获取每日账户中需要备份的信息 @@ -238,6 +218,18 @@ def get_daily_backup(api: TqApi, **kwargs): return backup +def is_trade_time(quote): + """判断当前是否是交易时间""" + trade_time = pd.Timestamp.now().strftime("%H:%M:%S") + times = quote["trading_time"]['day'] + quote["trading_time"]['night'] + + for sdt, edt in times: + if trade_time >= sdt and trade_time <= edt: + logger.info(f"当前时间:{trade_time},交易时间:{sdt} - {edt}") + return True + return False + + def adjust_portfolio(api: TqApi, portfolio, account=None, **kwargs): """调整账户组合 @@ -255,11 +247,21 @@ def adjust_portfolio(api: TqApi, portfolio, account=None, **kwargs): :param kwargs: dict, 其他参数 """ + timeout = kwargs.get("timeout", 600) + start_time = datetime.now() + symbol_infos = {} for symbol, conf in portfolio.items(): quote = api.get_quote(symbol) + if not is_trade_time(quote): + logger.warning(f"{symbol} 当前时间不是交易时间,跳过调仓") + continue + + lots = conf.get("target_volume", None) + if lots is None: + logger.warning(f"{symbol} 目标手数为 None,跳过调仓") + continue - lots = conf.get("target_volume", 0) price = conf.get("price", "PASSIVE") offset_priority = conf.get("offset_priority", "今昨,开") @@ -281,16 +283,17 @@ def adjust_portfolio(api: TqApi, portfolio, account=None, **kwargs): logger.info(f"调整仓位:{quote.datetime} - {contract}; 目标持仓:{lots}手; 当前持仓:{target_pos._pos.pos}手") - if target_pos._pos.pos == lots: + if target_pos._pos.pos == lots or target_pos.is_finished(): completed.append(True) - logger.info(f"调仓完成:{quote.datetime} - {contract}; {lots}手") + logger.info(f"调仓完成:{quote.datetime} - {contract}; 目标持仓:{lots}手; 当前持仓:{target_pos._pos.pos}手") else: completed.append(False) if all(completed): break - if kwargs.get("close_api", True): - api.close() + if (datetime.now() - start_time).seconds > timeout: + logger.error(f"调仓超时,已运行 {timeout} 秒") + break return api diff --git a/czsc/features/utils.py b/czsc/features/utils.py index 1d69826cb..3d7b4bd78 100644 --- a/czsc/features/utils.py +++ b/czsc/features/utils.py @@ -256,6 +256,52 @@ def __lr_slope(x): return df +def normalize_corr(df: pd.DataFrame, fcol, ycol=None, **kwargs): + """标准化因子与收益相关性为正数 + + 方法说明:对因子进行滚动相关系数计算,因子乘以滚动相关系数的符号 + + **注意:** + + 1. simple 模式下,计算过程有一定的未来信息泄露,在回测中使用时需要注意 + 2. rolling 模式下,计算过程依赖 window 参数,有可能调整后相关性为负数 + + :param df: pd.DataFrame, 必须包含 dt、symbol、price 列,以及因子列 + :param fcol: str 因子列名 + :param kwargs: dict + + - window: int, 滚动窗口大小 + - min_periods: int, 最小计算周期 + - mode: str, 计算方法, rolling 表示使用滚动调整相关系数,simple 表示使用镜像反转相关系数 + - copy: bool, 是否复制 df + + :return: pd.DataFrame + """ + window = kwargs.get("window", 1000) + min_periods = kwargs.get("min_periods", 5) + mode = kwargs.get("mode", "rolling") + if kwargs.get("copy", False): + df = df.copy() + + df = df.sort_values(['symbol', 'dt'], ascending=True).reset_index(drop=True) + for symbol, dfg in df.groupby("symbol"): + dfg['ycol'] = dfg['price'].pct_change().shift(-1) + + if mode.lower() == "rolling": + dfg['corr_sign'] = np.sign(dfg[fcol].rolling(window=window, min_periods=min_periods).corr(dfg['ycol'])) + dfg[fcol] = (dfg['corr_sign'].shift(3) * dfg[fcol]).fillna(0) + + elif mode.lower() == "simple": + corr_sign = np.sign(dfg[fcol].corr(dfg['ycol'])) + dfg[fcol] = corr_sign * dfg[fcol] + + else: + raise ValueError(f"Unknown mode: {mode}") + + df.loc[df['symbol'] == symbol, fcol] = dfg[fcol] + return df + + def feature_adjust_V230101(df: pd.DataFrame, fcol, **kwargs): """特征调整函数:对特征进行调整,使其符合持仓权重的定义 @@ -312,6 +358,7 @@ def feature_adjust(df: pd.DataFrame, fcol, method, **kwargs): :param fcol: str, 因子列名 :param method: str, 调整方法 + - KEEP: 直接使用原始因子值作为权重 - V230101: 对因子进行滚动相关系数计算,然后对因子值用 maxabs_scale 进行归一化,最后乘以滚动相关系数的符号 - V240323: 对因子进行滚动相关系数计算,然后对因子值用 scale + tanh 进行归一化,最后乘以滚动相关系数的符号 @@ -322,6 +369,10 @@ def feature_adjust(df: pd.DataFrame, fcol, method, **kwargs): :return: pd.DataFrame, 新增 weight 列 """ + if method == "KEEP": + df["weight"] = df[fcol] + return df + if method == "V230101": return feature_adjust_V230101(df, fcol, **kwargs) elif method == "V240323": diff --git a/czsc/signals/__init__.py b/czsc/signals/__init__.py index 15b1ce6d8..2deec5ffd 100644 --- a/czsc/signals/__init__.py +++ b/czsc/signals/__init__.py @@ -5,12 +5,6 @@ create_dt: 2021/11/21 17:48 describe: 信号函数 """ -# ====================================================================================================================== -# 以下是 0.9.1 开始的新标准下实现的信号函数,规范定义: -# 1. 前缀3个字符区分信号类别 -# 2. 后缀 V221107 之类的标识同一个信号函数的不同版本 -# ====================================================================================================================== - from czsc.signals.cxt import ( cxt_fx_power_V221107, cxt_first_buy_V221126, @@ -208,6 +202,7 @@ tas_macd_bc_V230804, tas_macd_bc_ubi_V230804, tas_slope_V231019, + tas_macd_bc_V240307, ) from czsc.signals.pos import ( @@ -263,3 +258,11 @@ zdy_dif_V230528, pressure_support_V240222, ) + + +from czsc.signals.xls import ( + xl_bar_position_V240328, + xl_bar_trend_V240329, + xl_bar_trend_V240330, + xl_bar_trend_V240331, +) diff --git a/czsc/signals/jcc.py b/czsc/signals/jcc.py index 379cd9052..9559b3115 100644 --- a/czsc/signals/jcc.py +++ b/czsc/signals/jcc.py @@ -6,7 +6,7 @@ describe: jcc 是 Japanese Candlestick Charting 的缩写,日本蜡烛图技术 """ import numpy as np -from typing import List, Any +from typing import List from collections import OrderedDict from czsc import CZSC from czsc.objects import Signal, RawBar, Direction diff --git a/czsc/signals/pos.py b/czsc/signals/pos.py index 5e1e786d4..259d6aee9 100644 --- a/czsc/signals/pos.py +++ b/czsc/signals/pos.py @@ -3,7 +3,7 @@ author: zengbin93 email: zeng_bin8888@163.com create_dt: 2023/4/14 19:27 -describe: +describe: """ from czsc.analyze import CZSC from collections import OrderedDict @@ -139,7 +139,7 @@ def pos_bar_stop_V230524(cat: CzscTrader, **kwargs) -> OrderedDict: 多头止损逻辑如下,反之为空头止损逻辑: - 1. 从多头开仓点开始,在给定对的K线周期 freq1 上向前找 N 个K线,记为 F1 + 1. 从多头开仓点开始,在给定的K线周期 freq1 上向前找 N 个K线,记为 F1 2. 将这 N 个K线的最低点,记为 L1,如果最新价跌破 L1,则止损 **信号列表:** @@ -149,10 +149,12 @@ def pos_bar_stop_V230524(cat: CzscTrader, **kwargs) -> OrderedDict: :param cat: CzscTrader对象 :param kwargs: 参数字典 + - pos_name: str,开仓信号的名称 - freq1: str,给定的K线周期 - n: int,向前找的K线个数,默认为 3 - :return: + + :return: OrderedDict """ pos_name = kwargs["pos_name"] freq1 = kwargs["freq1"] @@ -305,7 +307,7 @@ def pos_profit_loss_V230624(cat: CzscTrader, **kwargs) -> OrderedDict: - Signal('日线通道突破_60分钟YKB20N3_盈亏比判断V230624_多头止损_任意_任意_0') - Signal('日线通道突破_60分钟YKB20N3_盈亏比判断V230624_多头达标_任意_任意_0') - Signal('日线通道突破_60分钟YKB20N3_盈亏比判断V230624_空头达标_任意_任意_0') - + :param cat: CzscTrader对象 :param kwargs: 参数字典 @@ -456,5 +458,3 @@ def pos_holds_V230807(cat: CzscTrader, **kwargs) -> OrderedDict: v1 = '空头保本' return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) - - diff --git a/czsc/signals/tas.py b/czsc/signals/tas.py index 108fcf99a..f275734d4 100644 --- a/czsc/signals/tas.py +++ b/czsc/signals/tas.py @@ -3469,3 +3469,60 @@ def tas_double_ma_V240208(c: CZSC, **kwargs) -> OrderedDict: return create_single_signal(k1=k1, k2=k2, k3=k3, v1='空头') return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def tas_macd_bc_V240307(c: CZSC, **kwargs) -> OrderedDict: + """MACD柱子辅助背驰判断 + + 参数模板:"{freq}_D{di}N{n}柱子背驰_BS辅助V240307" + + **信号逻辑:** + + 以顶背驰为例,最近N根K线的MACD柱子都大于0,且最近一个柱子高点小于前面的柱子高点,认为是顶背驰,做空;反之,做多。 + + **信号列表:** + + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_底背驰_第1次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_底背驰_第2次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_底背驰_第3次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_顶背驰_第1次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_顶背驰_第2次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_顶背驰_第3次_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + di = int(kwargs.get('di', 1)) + n = int(kwargs.get('n', 20)) + + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}N{n}柱子背驰_BS辅助V240307".split('_') + v1, v2 = '其他', '其他' + cache_key = update_macd_cache(c) + if len(c.bars_raw) < 7 + n: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=di, n=n) + macd = [x.cache[cache_key]['macd'] for x in bars] + n = len(macd) + + # 计算 MACD 柱子的顶和底序列 + gs = [i for i in range(1, n - 1) if macd[i - 1] < macd[i] > macd[i + 1] and macd[i] > 0] + ds = [i for i in range(1, n - 1) if macd[i - 1] > macd[i] < macd[i + 1] and macd[i] < 0] + + if macd[-1] > 0 and len(gs) >= 2 and macd[gs[-1]] < macd[gs[-2]] and gs[-1] - gs[-2] > 2: + macd_sub = macd[gs[-2]:] + # 两个顶之间的柱子没有出现大的负值 + if abs(np.sum([x for x in macd_sub if x < 0])) < np.std(np.abs(macd_sub)): + v1 = '顶背驰' + v2 = f"第{n - gs[-1] - 1}次" + + if macd[-1] < 0 and len(ds) >= 2 and macd[ds[-1]] > macd[ds[-2]] and ds[-1] - ds[-2] > 2: + macd_sub = macd[ds[-2]:] + # 两个底之间的柱子没有出现大的正值 + if abs(np.sum([x for x in macd_sub if x > 0])) < np.std(np.abs(macd_sub)): + v1 = '底背驰' + v2 = f"第{n - ds[-1] - 1}次" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) diff --git a/czsc/signals/xls.py b/czsc/signals/xls.py new file mode 100644 index 000000000..d2023c2fe --- /dev/null +++ b/czsc/signals/xls.py @@ -0,0 +1,234 @@ +# -*- coding: utf-8 -*- +""" +author: zengbin93 +email: zeng_bin8888@163.com +create_dt: 2024/04/07 22:17 +describe: 谢磊贡献的信号函数 +""" +import numpy as np +from typing import Union, List +from collections import OrderedDict +from czsc.utils import create_single_signal, get_sub_elements +from czsc.signals.tas import update_ma_cache +from czsc.signals.jcc import check_szx +from czsc import CZSC + + +def xl_bar_position_V240328(c: CZSC, **kwargs) -> OrderedDict: + """相对位置信号; 贡献者:谢磊 + + 参数模板:"{freq}_N{n}_BS辅助V240328" + + **信号逻辑:** + + 1. 用当前价格与EMA的比值做一个偏离度指标 + 2. 当偏离度越高就越有可能是相对低点的位置 + + **信号列表:** + - Signal('30分钟_N10_BS辅助V240328_相对低点_任意_任意_0') + - Signal('30分钟_N10_BS辅助V240328_相对高点_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: + :return: 信号识别结果 + """ + + n = int(kwargs.get('n', 10)) + freq = c.freq.value + k1, k2, k3 = f"{freq}_N{n}_BS辅助V240328".split('_') + v1 = "其他" + + if len(c.bars_raw) < n + 1: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=1, n=n + 2 * n) + close = np.array([x.close for x in bars]) + cache_key_ema = update_ma_cache(c, ma_type='EMA', timeperiod=n) + ema = np.array([x.cache[cache_key_ema] for x in bars]) + nor = (close - ema) / ema + + if nor[-1] < np.quantile(nor, 0.3, method='midpoint'): + v1 = '相对低点' + + elif nor[-1] > np.quantile(nor, 0.7, method='midpoint'): + v1 = '相对高点' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + +def xl_bar_trend_V240329(c: CZSC, **kwargs) -> OrderedDict: + """底部反转形态信号; 贡献者:谢磊 + + 正向的十字孕线(Bullish Harami Cross)是一种看涨的蜡烛图形态,属于孕线形态的一种变体。 + 这种形态出现在下跌趋势的末端,可能预示着趋势即将反转向上。正向的十字孕线由两根蜡烛图组成: + 第一根是一个长实体的阴线,显示了强劲的下跌趋势;第二根是一个十字线(或接近十字线的形态), + 其开盘价和收盘价都处于第一根阴线实体的中部以下,但实体部分较小,且颜色可以是阳线或阴线。 + + 参数模板:"{freq}_N{n}M{m}_十字线反转V240329" + + **信号逻辑:** + + 1, 十字线定义,(h -l) / (c - o) 的绝对值大于 th,或 c == o + + + **信号列表:** + - Signal('30分钟_N5M5_十字线反转V240329_底部十字孕线_任意_任意_0') + - Signal('30分钟_N5M5_十字线反转V240329_顶部十字孕线_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: + :return: 信号识别结果 + """ + n = int(kwargs.get('n', 10)) + m = int(kwargs.get('m', 5)) + freq = c.freq.value + k1, k2, k3 = f"{freq}_N{n}M{m}_十字线反转V240329".split('_') + v1, v2 = "其他", "其他" + + if len(c.bars_raw) < n + 1: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + bar1, bar2 = get_sub_elements(c.bars_raw, di=1, n=2) + if check_szx(bar2, n) and bar1.close < bar1.open and (bar1.open - bar1.close) / (bar1.high - bar1.low) * 10 >= m: + v1 = '底部十字孕线' + + if check_szx(bar2, n) and bar1.close > bar1.open and (bar1.close - bar1.open) / (bar1.high - bar1.low) * 10 >= m: + v1 = '顶部十字孕线' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + +def xl_bar_trend_V240330(c: CZSC, **kwargs) -> OrderedDict: + """完全分类,均线金叉过滤信号; 贡献者:谢磊 + + 参数模板:"{freq}_N{n}M{m}#{ma_type}_双均线过滤V240330" + + **信号逻辑:** + + 1, 当25日均线大于350日均线,看多 + 2. 当25日均线小于350日均线,看空 + 3. 均线类型可选,平均线,EMA线。 + + **信号列表:** + + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第03次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第04次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第05次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第06次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第07次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第08次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第09次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第10次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第01次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第02次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第03次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第04次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第01次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看空_第02次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第05次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第06次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第07次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第08次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第09次_任意_0') + - Signal('15分钟_N5M21#SMA_双均线过滤V240330_看多_第10次_任意_0') + + :param c: CZSC对象 + :param kwargs: + :return: 信号识别结果 + """ + n = int(kwargs.get('n', 5)) + m = int(kwargs.get('m', 21)) + ma_type = kwargs.get('ma_type', 'SMA') + + freq = c.freq.value + k1, k2, k3 = f"{freq}_N{n}M{m}#{ma_type}_双均线过滤V240330".split('_') + v1, v2 = "其他", "其他" + + if len(c.bars_raw) < m + 1: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + cache_key1 = update_ma_cache(c, ma_type=ma_type, timeperiod=n) + cache_key2 = update_ma_cache(c, ma_type=ma_type, timeperiod=m) + bars = get_sub_elements(c.bars_raw, di=1, n=m + 1) + cache1 = [x.cache[cache_key1] for x in bars] + cache2 = [x.cache[cache_key2] for x in bars] + + def _countN(x1: Union[List, np.array], x2: Union[List, np.array]): # type: ignore + """输入两个序列,计算 次数 + + :param x1: list + :param x2: list + :return: int + """ + x = np.array(x1) < np.array(x2) + y = np.array(x1) > np.array(x2) + num = 0 + for i in range(len(x) - 1): + b1, b2 = x[i], x[i + 1] + if b2 and b1 != b2: + num = 1 + elif b2 and b1 == b2: + num += 1 + b3, b4 = y[i], y[i + 1] + if b4 and b3 != b4: + num = 1 + elif b4 and b3 == b4: + num += 1 + + if num >= 10: + num = 10 + return num + + num = _countN(cache1, cache2) + num = f"第{str(num).zfill(2)}次" + if cache1[-1] > cache2[-1]: + v1 = '看多' + v2 = num + + elif cache1[-1] < cache2[-1]: + v1 = '看空' + v2 = num + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + +def xl_bar_trend_V240331(c: CZSC, **kwargs) -> OrderedDict: + """突破信号; 贡献者:谢磊 + + 参数模板:"{freq}_N{n}_突破信号V240331" + + **信号逻辑:** + + 1, 突破前N日最高价,入场,做多 + 2. 跌破前N日最低价,入场,做空 + + **信号列表:** + + - Signal('30分钟_N20_突破信号V240331_做多_任意_任意_0') + - Signal('30分钟_N20_突破信号V240331_做空_任意_任意_0') + + :param c: CZSC对象 + :param kwargs: + :return: 信号识别结果 + """ + n = int(kwargs.get('n', 20)) + freq = c.freq.value + k1, k2, k3 = f"{freq}_N{n}_突破信号V240331".split('_') + v1 = "其他" + + if len(c.bars_raw) < n + 1: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars2 = get_sub_elements(c.bars_raw, di=1, n=n + 1) + hh = max([x.high for x in bars2[0:-1]]) + ll = min([x.low for x in bars2[0:-1]]) + _high = bars2[-1].high + _low = bars2[-1].low + + if _high >= hh: + v1 = '做多' + + elif _low <= ll: + v1 = '做空' + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) diff --git a/czsc/traders/rwc.py b/czsc/traders/rwc.py index 52225bbd3..205cf0579 100644 --- a/czsc/traders/rwc.py +++ b/czsc/traders/rwc.py @@ -80,8 +80,9 @@ def __init__(self, strategy_name, redis_url=None, connection_pool=None, send_hea def set_metadata(self, base_freq, description, author, outsample_sdt, **kwargs): """设置策略元数据""" key = f'{self.key_prefix}:META:{self.strategy_name}' + overwrite = kwargs.pop('overwrite', False) if self.r.exists(key): - if not kwargs.pop('overwrite', False): + if not overwrite: logger.warning(f'已存在 {self.strategy_name} 的元数据,如需覆盖请设置 overwrite=True') return else: @@ -457,23 +458,30 @@ def get_strategy_weights(strategy_name, redis_url=None, connection_pool=None, ke :param connection_pool: redis.ConnectionPool, redis连接池 :param key_prefix: str, redis中key的前缀,默认为 Weights :param kwargs: dict, 其他参数 + + - symbols : list, 品种列表, 默认为None, 即获取所有品种的权重 + - sdt : str, 开始时间, eg: 20210924 10:19:00 + - edt : str, 结束时间, eg: 20220924 10:19:00 + - only_last : boolean, 是否只保留每个品种最近一次权重,默认为False + :return: pd.DataFrame """ + kwargs.pop("send_heartbeat", None) rwc = RedisWeightsClient(strategy_name, redis_url=redis_url, connection_pool=connection_pool, - key_prefix=key_prefix, **kwargs) + key_prefix=key_prefix, send_heartbeat=False, **kwargs) sdt = kwargs.get("sdt") edt = kwargs.get("edt") symbols = kwargs.get("symbols") only_last = kwargs.get("only_last", False) if only_last: - # 只保留每个品种最近一次权重 + # 保留每个品种最近一次权重 df = rwc.get_last_weights(ignore_zero=False) return df df = rwc.get_all_weights(sdt=sdt, edt=edt) if symbols: - # 只保留指定品种的权重 + # 保留指定品种的权重 not_in = [x for x in symbols if x not in df['symbol'].unique()] if not_in: logger.warning(f"{strategy_name} 中没有 {not_in} 的权重记录") diff --git a/czsc/traders/weight_backtest.py b/czsc/traders/weight_backtest.py index 4ce0915d4..3c5f22dec 100644 --- a/czsc/traders/weight_backtest.py +++ b/czsc/traders/weight_backtest.py @@ -205,7 +205,9 @@ def stoploss_by_direction(dfw, stoploss=0.03, **kwargs): # 止损:同一个订单下,min_hold_returns < -stoploss时,后续weight置为0 dfg['is_stop'] = (dfg['min_hold_returns'] < -stoploss) & (dfg['order_id'] == dfg['order_id'].shift(1)) - dfg['weight'] = np.where((dfg['is_stop'].shift(1)) & (dfg['order_id'] == dfg['order_id'].shift(1)), 0, dfg['weight']) + c1 = dfg['is_stop'].shift(1) & (dfg['order_id'] == dfg['order_id'].shift(1)) + dfg.loc[c1, 'weight'] = 0 + dfg['weight'] = np.where(c1, 0, dfg['weight']) rows.append(dfg.copy()) dfw1 = pd.concat(rows, ignore_index=True) @@ -229,7 +231,7 @@ def __init__(self, dfw, digits=2, **kwargs) -> None: 3. 检查self.dfw中是否存在空值,如果存在则抛出ValueError异常,并提示"dfw 中存在空值,请先处理"。 4. 设置实例变量self.digits为传入的digits值。 5. 从kwargs中获取'fee_rate'参数的值,默认为0.0002,并将其保存在实例变量self.fee_rate中。 - 6. 将self.dfw中的'weight'列转换为浮点型,并保留self.digits位小数。 + 6. 将self.dfw中的 weight 列转换为浮点型,并保留self.digits位小数。 7. 提取self.dfw中的唯一交易标的符号,并将其保存在实例变量self.symbols中。 8. 执行backtest()方法进行回测,并将结果保存在实例变量self.results中。 @@ -361,8 +363,8 @@ def get_symbol_pairs(self, symbol): def __add_operate(dt, bar_id, volume, price, operate): for _ in range(abs(volume)): - op = {'bar_id': bar_id, "dt": dt, "price": price, "operate": operate} - operates.append(op) + _op = {'bar_id': bar_id, "dt": dt, "price": price, "operate": operate} + operates.append(_op) rows = dfs.to_dict(orient='records') @@ -388,12 +390,12 @@ def __add_operate(dt, bar_id, volume, price, operate): elif row2['volume'] < row1['volume']: __add_operate(row2['dt'], row2['bar_id'], row2['volume'] - row1['volume'], row2['price'], operate='开空') - elif row1['volume'] >= 0 and row2['volume'] <= 0: + elif row1['volume'] >= 0 >= row2['volume']: # 多头转换成空头对应的操作 __add_operate(row2['dt'], row2['bar_id'], row1['volume'], row2['price'], operate='平多') __add_operate(row2['dt'], row2['bar_id'], row2['volume'], row2['price'], operate='开空') - elif row1['volume'] <= 0 and row2['volume'] >= 0: + elif row1['volume'] <= 0 <= row2['volume']: # 空头转换成多头对应的操作 __add_operate(row2['dt'], row2['bar_id'], row1['volume'], row2['price'], operate='平空') __add_operate(row2['dt'], row2['bar_id'], row2['volume'], row2['price'], operate='开多') diff --git a/czsc/utils/__init__.py b/czsc/utils/__init__.py index 91e3fa7ec..3acf6a572 100644 --- a/czsc/utils/__init__.py +++ b/czsc/utils/__init__.py @@ -20,7 +20,7 @@ from .cross import CrossSectionalPerformance, cross_sectional_ranker from .stats import daily_performance, net_value_stats, subtract_fee, weekly_performance, holds_performance, top_drawdowns from .signal_analyzer import SignalAnalyzer, SignalPerformance -from .cache import home_path, get_dir_size, empty_cache_path, DiskCache, disk_cache +from .cache import home_path, get_dir_size, empty_cache_path, DiskCache, disk_cache, clear_cache from .index_composition import index_composition from .data_client import DataClient, set_url_token, get_url_token from .oss import AliyunOSS diff --git a/czsc/utils/bar_generator.py b/czsc/utils/bar_generator.py index bf6248260..5565ab382 100644 --- a/czsc/utils/bar_generator.py +++ b/czsc/utils/bar_generator.py @@ -84,18 +84,20 @@ def check_freq_and_market(time_seq: List[AnyStr], freq: Optional[AnyStr] = None) if freq in ['日线', '周线', '月线', '季线', '年线']: return freq, "默认" - if freq == '1分钟': - time_seq.extend(['14:57', '14:58', '14:59', '15:00']) - time_seq = sorted(list(set(time_seq))) assert len(time_seq) >= 2, "time_seq长度必须大于等于2" for key, tts in freq_market_times.items(): if freq and not key.startswith(freq): continue + freq_x, market = key.split("_") + + if freq_x == '1分钟': + time_seq.extend(['14:57', '14:58', '14:59', '15:00']) - if set(time_seq) == set(tts[:len(time_seq)]): - freq_x, market = key.split("_") + sub_tts = [x for x in tts if x >= min(time_seq) and x <= max(time_seq)] + if set(time_seq) == set(sub_tts): + # print(f"check_freq_and_market: {freq_x} - {market}") return freq_x, market return None, "默认" @@ -210,7 +212,7 @@ def resample_bars(df: pd.DataFrame, target_freq: Union[Freq, AnyStr], raw_bars=T base_freq = kwargs.get('base_freq', None) if target_freq.value.endswith("分钟"): - uni_times = df['dt'].head(2000).apply(lambda x: x.strftime("%H:%M")).unique().tolist() + uni_times = sorted(df['dt'].tail(2000).apply(lambda x: x.strftime("%H:%M")).unique().tolist()) _, market = check_freq_and_market(uni_times, freq=base_freq) else: market = "默认" diff --git a/czsc/utils/cache.py b/czsc/utils/cache.py index fe940a811..18dc6e762 100644 --- a/czsc/utils/cache.py +++ b/czsc/utils/cache.py @@ -44,7 +44,7 @@ class DiskCache: def __init__(self, path=None): self.path = home_path / "disk_cache" if path is None else Path(path) if self.path.is_file(): - raise Exception("path has exist") + raise Exception("path must be a directory, not a file") self.path.mkdir(parents=True, exist_ok=True) @@ -68,6 +68,7 @@ def is_found(self, k: str, suffix: str = "pkl", ttl=-1) -> bool: create_time = file.stat().st_ctime if (time.time() - create_time) > ttl: logger.info(f"缓存文件已过期, {file}") + os.remove(file) return False logger.info(f"缓存文件已找到, {file}") @@ -162,7 +163,7 @@ def remove(self, k: str, suffix: str = "pkl"): def disk_cache(path: str = home_path, suffix: str = "pkl", ttl: int = -1): """缓存装饰器,支持多种数据格式 - :param path: 缓存文件夹路径 + :param path: 缓存文件夹父路径,默认为 home_path,每个函数的缓存文件夹为 path/func_name :param suffix: 缓存文件后缀,支持 pkl, json, txt, csv, xlsx, feather, parquet :param ttl: 缓存文件有效期,单位:秒 """ @@ -191,3 +192,26 @@ def cached_func(*args, **kwargs): return cached_func return decorator + + +def clear_cache(path=home_path, subs=None, recreate=False): + """清空缓存文件夹 + + :param path: 缓存文件夹路径 + :param subs: 需要清空的子文件夹名称,如果为 None,则清空整个文件夹 + :param recreate: 是否重新创建文件夹, True 时会重新创建文件夹, False 时不会重新创建文件夹 + """ + path = Path(path) + if subs is None: + shutil.rmtree(path) + path.mkdir(parents=True, exist_ok=False) + logger.info(f"已清空缓存文件夹:{path}") + return + + for sub in subs: + fpath = path / sub + if fpath.exists(): + shutil.rmtree(fpath) + if recreate: + fpath.mkdir(parents=True, exist_ok=True) + logger.info(f"已清空缓存文件夹:{fpath}") diff --git a/czsc/utils/minites_split.feather b/czsc/utils/minites_split.feather index c2c1a0b10..7999c167b 100644 Binary files a/czsc/utils/minites_split.feather and b/czsc/utils/minites_split.feather differ diff --git a/czsc/utils/st_components.py b/czsc/utils/st_components.py index ff53a0a4a..f074b3cb3 100644 --- a/czsc/utils/st_components.py +++ b/czsc/utils/st_components.py @@ -20,6 +20,7 @@ def show_daily_return(df, **kwargs): - stat_hold_days: bool,是否展示持有日绩效指标,默认为 True - legend_only_cols: list,仅在图例中展示的列名 - use_st_table: bool,是否使用 st.table 展示绩效指标,默认为 False + - plot_cumsum: bool,是否展示日收益累计曲线,默认为 True """ if not df.index.dtype == 'datetime64[ns]': @@ -88,19 +89,21 @@ def _stats(df_, type_='持有日'): with st.expander("持有日绩效指标", expanded=False): st.dataframe(_stats(df, type_='持有日'), use_container_width=True) - df = df.cumsum() - fig = px.line(df, y=df.columns.to_list(), title="日收益累计曲线") - fig.update_xaxes(title='') + if kwargs.get("plot_cumsum", True): + df = df.cumsum() + fig = px.line(df, y=df.columns.to_list(), title="日收益累计曲线") + fig.update_xaxes(title='') - # 添加每年的开始第一个日期的竖线 - for year in range(df.index.year.min(), df.index.year.max() + 1): - first_date = df[df.index.year == year].index.min() - fig.add_vline(x=first_date, line_dash='dash', line_color='red') + # 添加每年的开始第一个日期的竖线 + for year in range(df.index.year.min(), df.index.year.max() + 1): + first_date = df[df.index.year == year].index.min() + fig.add_vline(x=first_date, line_dash='dash', line_color='red') - for col in kwargs.get("legend_only_cols", []): - fig.update_traces(visible="legendonly", selector=dict(name=col)) - - st.plotly_chart(fig, use_container_width=True) + for col in kwargs.get("legend_only_cols", []): + fig.update_traces(visible="legendonly", selector=dict(name=col)) + # fig.update_layout(legend=dict(yanchor="top", y=0.99, xanchor="left", x=0.01)) + fig.update_layout(margin=dict(l=0, r=0, b=0)) + st.plotly_chart(fig, use_container_width=True) def show_monthly_return(df, ret_col='total', title="月度累计收益", **kwargs): @@ -348,6 +351,7 @@ def show_weight_backtest(dfw, **kwargs): - fee: 单边手续费,单位为BP,默认为2BP - digits: 权重小数位数,默认为2 + - show_drawdowns: bool,是否展示最大回撤,默认为 False - show_daily_detail: bool,是否展示每日收益详情,默认为 False - show_backtest_detail: bool,是否展示回测详情,默认为 False - show_splited_daily: bool,是否展示分段日收益表现,默认为 False @@ -379,10 +383,13 @@ def show_weight_backtest(dfw, **kwargs): c10.metric("多头占比", f"{stat['多头占比']:.2%}") st.divider() - dret = wb.results['品种等权日收益'] + dret = wb.results['品种等权日收益'].copy() dret.index = pd.to_datetime(dret.index) show_daily_return(dret, legend_only_cols=dfw['symbol'].unique().tolist(), **kwargs) + if kwargs.get("show_drawdowns", False): + show_drawdowns(dret, ret_col='total', sub_title="") + if kwargs.get("show_backtest_detail", False): c1, c2 = st.columns([1, 1]) with c1.expander("品种等权日收益", expanded=False): @@ -762,6 +769,7 @@ def show_out_in_compare(df, ret_col, mid_dt, **kwargs): df = df[[ret_col]].copy().fillna(0) df.sort_index(inplace=True, ascending=True) + mid_dt = pd.to_datetime(mid_dt) dfi = df[df.index < mid_dt].copy() dfo = df[df.index >= mid_dt].copy() @@ -807,7 +815,7 @@ def show_out_in_compare(df, ret_col, mid_dt, **kwargs): '新高占比': '{:.2%}', } ) - st.dataframe(df_stats, use_container_width=True) + st.dataframe(df_stats, use_container_width=True, hide_index=True) def show_optuna_study(study: optuna.Study, **kwargs): @@ -867,8 +875,18 @@ def show_drawdowns(df, ret_col, **kwargs): dft = dft.format({'净值回撤': '{:.2%}', '回撤天数': '{:.0f}', '恢复天数': '{:.0f}'}) st.dataframe(dft, use_container_width=True) + # 画图: 净值回撤 drawdown = go.Scatter(x=df.index, y=df["drawdown"], fillcolor="red", fill='tozeroy', mode="lines", name="回测曲线") fig = go.Figure(drawdown) + + # 增加 10% 分位数线,30% 分位数线,50% 分位数线,同时增加文本标记 + for q in [0.1, 0.3, 0.5]: + y1 = df["drawdown"].quantile(q) + fig.add_hline(y=y1, line_dash="dot", line_color="green", line_width=2) + fig.add_annotation(x=df.index[-1], y=y1, text=f"{q:.1%} (DD: {y1:.2%})", showarrow=False, yshift=10) + fig.update_layout(margin=dict(l=0, r=0, t=0, b=0)) fig.update_layout(title="", xaxis_title="", yaxis_title="净值回撤", legend_title="回撤曲线") + # 限制 绘制高度 + fig.update_layout(height=300) st.plotly_chart(fig, use_container_width=True) diff --git a/examples/develop/minites_split.py b/examples/develop/minites_split.py new file mode 100644 index 000000000..c4499c5d7 --- /dev/null +++ b/examples/develop/minites_split.py @@ -0,0 +1,14 @@ +import pandas as pd + +df = pd.read_feather(r"D:\ZB\git_repo\waditu\czsc\czsc\utils\minites_split.feather") +df.to_csv(r"C:\Users\zengb\Desktop\minites_split.csv", index=False, encoding="gbk") + +# df = pd.read_feather(r"C:\Users\zengb\Desktop\minites_split_240317.feather") + +df = pd.read_csv(r"C:\Users\zengb\Desktop\minites_split.csv", encoding="gbk") +# df.to_excel(r"C:\Users\zengb\Desktop\minites_split.xlsx", index=False) +cols = [x for x in df.columns if x not in ["market"]] +for col in cols: + df[col] = pd.to_datetime(df[col]).dt.strftime("%H:%M") + +df.to_feather(r"D:\ZB\git_repo\waditu\czsc\czsc\utils\minites_split.feather") diff --git a/examples/develop/weight_backtest.py b/examples/develop/weight_backtest.py index d27593d29..8762b8c9b 100644 --- a/examples/develop/weight_backtest.py +++ b/examples/develop/weight_backtest.py @@ -1,268 +1,15 @@ # https://s0cqcxuy3p.feishu.cn/wiki/Pf1fw1woQi4iJikbKJmcYToznxb import sys sys.path.insert(0, r"D:\ZB\git_repo\waditu\czsc") - import czsc import pandas as pd -import numpy as np -import plotly.express as px -from czsc import CzscTrader -from loguru import logger -from pathlib import Path -from typing import Union, AnyStr, Callable -from czsc.utils.stats import daily_performance, evaluate_pairs czsc.welcome() -def get_ensemble_weight(trader: CzscTrader, method: Union[AnyStr, Callable] = 'mean'): - """获取 CzscTrader 中所有 positions 按照 method 方法集成之后的权重 - - :param trader: CzscTrader - 缠论交易者 - :param method: str or callable - 集成方法,可选值包括:'mean', 'max', 'min', 'vote' - 也可以传入自定义的函数,函数的输入为 dict,key 为 position.name,value 为 position.pos, 样例输入: - {'多头策略A': 1, '多头策略B': 1, '空头策略A': -1} - :param kwargs: - :return: pd.DataFrame - columns = ['dt', 'symbol', 'weight', 'price'] - """ - logger.info(f"trader positions: {[p.name for p in trader.positions]}") - - dfp = pd.DataFrame() - for p in trader.positions: - p_pos = pd.DataFrame(p.holds) - if dfp.empty: - dfp = p_pos.copy() - else: - assert dfp['dt'].equals(p_pos['dt']) - dfp = dfp.merge(p_pos[['dt', 'pos']], on='dt', how='left') - dfp.rename(columns={'pos': p.name}, inplace=True) - - pos_cols = [c for c in dfp.columns if c not in ['dt', 'weight', 'price']] - if callable(method): - dfp['weight'] = dfp[pos_cols].apply(lambda x: method(x.to_dict()), axis=1) - else: - method = method.lower() - if method == "mean": - dfp['weight'] = dfp[pos_cols].mean(axis=1) - elif method == "max": - dfp['weight'] = dfp[pos_cols].max(axis=1) - elif method == "min": - dfp['weight'] = dfp[pos_cols].min(axis=1) - elif method == "vote": - dfp['weight'] = dfp[pos_cols].apply(lambda x: np.sign(np.sum(x)), axis=1) - else: - raise ValueError(f"method {method} not supported") - - dfp['symbol'] = trader.symbol - logger.info(f"trader weight decribe: {dfp['weight'].describe().round(4).to_dict()}") - return dfp[['dt', 'symbol', 'weight', 'price']].copy() - - -class WeightBacktest: - """持仓权重回测""" - - def __init__(self, dfw, digits=2, **kwargs) -> None: - """持仓权重回测 - - :param dfw: pd.DataFrame, columns = ['dt', 'symbol', 'weight', 'price'], 持仓权重数据, - 其中 - dt 为结束时间, - symbol 为合约代码, - weight 为持仓权重, - price 为结束时间对应的交易价格,可以是当前K线的收盘价,或者下一根K线的开盘价,或者未来N根K线的TWAP、VWAP等 - - 数据样例如下: - =================== ======== ======== ======= - dt symbol weight price - =================== ======== ======== ======= - 2019-01-02 09:01:00 DLi9001 0.5 961.695 - 2019-01-02 09:02:00 DLi9001 0.25 960.72 - 2019-01-02 09:03:00 DLi9001 0.25 962.669 - 2019-01-02 09:04:00 DLi9001 0.25 960.72 - 2019-01-02 09:05:00 DLi9001 0.25 961.695 - =================== ======== ======== ======= - - :param digits: int, 权重列保留小数位数 - :param kwargs: - - fee_rate: float,单边交易成本,包括手续费与冲击成本, 默认为 0.0002 - """ - self.kwargs = kwargs - self.dfw = dfw.copy() - self.digits = digits - self.fee_rate = kwargs.get('fee_rate', 0.0002) - self.dfw['weight'] = self.dfw['weight'].round(digits) - self.symbols = list(self.dfw['symbol'].unique().tolist()) - self.res_path = Path(kwargs.get('res_path', "weight_backtest")) - self.res_path.mkdir(exist_ok=True, parents=True) - logger.add(self.res_path.joinpath("weight_backtest.log"), rotation="1 week") - logger.info(f"持仓权重回测参数:digits={digits}, fee_rate={self.fee_rate},res_path={self.res_path},kwargs={kwargs}") - - def get_symbol_daily(self, symbol): - """获取某个合约的每日收益率 - - :param symbol: str,合约代码 - :return: pd.DataFrame,品种每日收益率, - columns = ['date', 'symbol', 'edge', 'return', 'cost'] - 其中 - date 为交易日, - symbol 为合约代码, - edge 为每日收益率, - return 为每日收益率减去交易成本后的真实收益, - cost 为交易成本 - - 数据样例如下: - - ========== ======== ============ ============ ======= - date symbol edge return cost - ========== ======== ============ ============ ======= - 2019-01-02 DLi9001 0.00230261 0.00195919 0.00085 - 2019-01-03 DLi9001 0.00425589 0.00310589 0.00115 - 2019-01-04 DLi9001 -0.0014209 -0.0024709 0.00105 - 2019-01-07 DLi9001 0.000988305 -0.000111695 0.0011 - 2019-01-08 DLi9001 -0.0004743 -0.0016243 0.00115 - ========== ======== ============ ============ ======= - """ - dfs = self.dfw[self.dfw['symbol'] == symbol].copy() - dfs['edge'] = dfs['weight'] * (dfs['price'].shift(-1) / dfs['price'] - 1) - dfs['cost'] = abs(dfs['weight'].shift(1) - dfs['weight']) * self.fee_rate - dfs['edge_post_fee'] = dfs['edge'] - dfs['cost'] - daily = dfs.groupby(dfs['dt'].dt.date).agg({'edge': 'sum', 'edge_post_fee': 'sum', 'cost': 'sum'}).reset_index() - daily['symbol'] = symbol - daily.rename(columns={'edge_post_fee': 'return', 'dt': 'date'}, inplace=True) - daily = daily[['date', 'symbol', 'edge', 'return', 'cost']] - return daily - - def get_symbol_pairs(self, symbol): - """获取某个合约的开平交易记录""" - dfs = self.dfw[self.dfw['symbol'] == symbol].copy() - dfs['direction'] = 0 - dfs.loc[dfs['weight'] > 0, 'direction'] = 1 - dfs.loc[dfs['weight'] < 0, 'direction'] = -1 - dfs['volume'] = (dfs['weight'] * pow(10, self.digits)).astype(int) * dfs['direction'] - dfs['bar_id'] = list(range(1, len(dfs)+1)) - - # 根据权重变化生成开平仓记录 - operates = [] - - def __add_operate(dt, bar_id, volume, price, operate): - for _ in range(abs(volume)): - op = {'bar_id': bar_id, "dt": dt, "price": price, "operate": operate} - operates.append(op) - - rows = dfs.to_dict(orient='records') - - # 处理第一个 row - if rows[0]['volume'] > 0: - __add_operate(rows[0]['dt'], rows[0]['bar_id'], rows[0]['volume'], rows[0]['price'], operate='开多') - elif rows[0]['volume'] < 0: - __add_operate(rows[0]['dt'], rows[0]['bar_id'], rows[0]['volume'], rows[0]['price'], operate='开空') - - # 处理后续 rows - for row1, row2 in zip(rows[:-1], rows[1:]): - if row1['volume'] >= 0 and row2['volume'] >= 0: - # 多头仓位变化对应的操作 - if row2['volume'] > row1['volume']: - __add_operate(row2['dt'], row2['bar_id'], row2['volume'] - row1['volume'], row2['price'], operate='开多') - elif row2['volume'] < row1['volume']: - __add_operate(row2['dt'], row2['bar_id'], row1['volume'] - row2['volume'], row2['price'], operate='平多') - - elif row1['volume'] <= 0 and row2['volume'] <= 0: - # 空头仓位变化对应的操作 - if row2['volume'] > row1['volume']: - __add_operate(row2['dt'], row2['bar_id'], row1['volume'] - row2['volume'], row2['price'], operate='平空') - elif row2['volume'] < row1['volume']: - __add_operate(row2['dt'], row2['bar_id'], row2['volume'] - row1['volume'], row2['price'], operate='开空') - - elif row1['volume'] >= 0 and row2['volume'] <= 0: - # 多头转换成空头对应的操作 - __add_operate(row2['dt'], row2['bar_id'], row1['volume'], row2['price'], operate='平多') - __add_operate(row2['dt'], row2['bar_id'], row2['volume'], row2['price'], operate='开空') - - elif row1['volume'] <= 0 and row2['volume'] >= 0: - # 空头转换成多头对应的操作 - __add_operate(row2['dt'], row2['bar_id'], row1['volume'], row2['price'], operate='平空') - __add_operate(row2['dt'], row2['bar_id'], row2['volume'], row2['price'], operate='开多') - - pairs, opens =[], [] - for op in operates: - if op['operate'] in ['开多', '开空']: - opens.append(op) - continue - - assert op['operate'] in ['平多', '平空'] - open_op = opens.pop() - if open_op['operate'] == '开多': - p_ret = round((op['price'] - open_op['price']) / open_op['price'] * 10000, 2) - p_dir = '多头' - else: - p_ret = round((open_op['price'] - op['price']) / open_op['price'] * 10000, 2) - p_dir = '空头' - pair = {"标的代码": symbol, "交易方向": p_dir, - "开仓时间": open_op['dt'], "平仓时间": op['dt'], - "开仓价格": open_op['price'], "平仓价格": op['price'], - "持仓K线数": op['bar_id'] - open_op['bar_id'] + 1, - "事件序列": f"{open_op['operate']} -> {op['operate']}", - "持仓天数": (op['dt'] - open_op['dt']).days, - "盈亏比例": p_ret} - pairs.append(pair) - df_pairs = pd.DataFrame(pairs) - return df_pairs - - def backtest(self): - """回测所有合约的收益率""" - res = {} - for symbol in self.symbols: - daily = self.get_symbol_daily(symbol) - pairs = self.get_symbol_pairs(symbol) - res[symbol] = {"daily": daily, "pairs": pairs} - - pd.to_pickle(res, self.res_path.joinpath("res.pkl")) - logger.info(f"回测结果已保存到 {self.res_path.joinpath('res.pkl')}") - - # 品种等权费后日收益率 - dret = pd.concat([v['daily'] for v in res.values()], ignore_index=True) - dret = pd.pivot_table(dret, index='date', columns='symbol', values='return').fillna(0) - dret['total'] = dret[list(res.keys())].mean(axis=1) - logger.info(f"品种等权费后日收益率:{daily_performance(dret['total'])}") - dret.to_excel(self.res_path.joinpath("daily_return.xlsx"), index=True) - logger.info(f"品种等权费后日收益率已保存到 {self.res_path.joinpath('daily_return.xlsx')}") - - # 品种等权费后日收益率资金曲线绘制 - dret = dret.cumsum() - fig = px.line(dret, y=dret.columns.to_list(), title="费后日收益率资金曲线") - fig.for_each_trace(lambda trace: trace.update(visible=True if trace.name == 'total' else 'legendonly')) - fig.write_html(self.res_path.joinpath("daily_return.html")) - logger.info(f"费后日收益率资金曲线已保存到 {self.res_path.joinpath('daily_return.html')}") - - # 所有开平交易记录的表现 - dfp = pd.concat([v['pairs'] for v in res.values()], ignore_index=True) - pairs_stats = evaluate_pairs(dfp) - pairs_stats = {k: v for k, v in pairs_stats.items() if k in ['单笔收益', '持仓K线数', '交易胜率', '持仓天数']} - logger.info(f"所有开平交易记录的表现:{pairs_stats}") - czsc.save_json(pairs_stats, self.res_path.joinpath("pairs_stats.json")) - logger.info(f"所有开平交易记录的表现已保存到 {self.res_path.joinpath('pairs_stats.json')}") - - return res - - -def test_ensemble(): - """从单个 trader 中获取持仓权重,然后回测""" - trader = czsc.dill_load(r"D:\czsc_bi_datas\期货CTA投研\2019-01-01_2022-01-01_BE44E170\backtest_E497C9B5\traders\DLi9001.trader") - - def __ensemble_method(x): - return (x['5分钟MACD多头T0'] + x['5分钟SMA#40多头T0']) / 2 - # dfw = get_ensemble_weight(trader, method='mean') - dfw = get_ensemble_weight(trader, method=__ensemble_method) - wb = WeightBacktest(dfw, digits=1, fee_rate=0.0002, res_path=r"C:\Users\zengb\Desktop\weight_example_vote") - wb.backtest() - - def test_ensemble_weight(): """从持仓权重样例数据中回测""" dfw = pd.read_feather(r"C:\Users\zengb\Desktop\weight_example.feather") - wb = WeightBacktest(dfw, digits=1, fee_rate=0.0002, res_path=r"C:\Users\zengb\Desktop\weight_example") + wb = czsc.WeightBacktest(dfw, digits=1, fee_rate=0.0002, res_path=r"C:\Users\zengb\Desktop\weight_example") res = wb.backtest() diff --git a/examples/signals_dev/tas_macd_bc_V240307.py b/examples/signals_dev/tas_macd_bc_V240307.py new file mode 100644 index 000000000..823ca2fb6 --- /dev/null +++ b/examples/signals_dev/tas_macd_bc_V240307.py @@ -0,0 +1,78 @@ +import numpy as np +from collections import OrderedDict +from czsc.analyze import CZSC +from czsc.objects import Direction, ZS +from czsc.signals.tas import update_macd_cache +from czsc.utils import create_single_signal, get_sub_elements + + +def tas_macd_bc_V240307(c: CZSC, **kwargs) -> OrderedDict: + """MACD柱子辅助背驰判断 + + 参数模板:"{freq}_D{di}N{n}柱子背驰_BS辅助V240307" + + **信号逻辑:** + + 以顶背驰为例,最近N根K线的MACD柱子都大于0,且最近一个柱子高点小于前面的柱子高点,认为是顶背驰,做空;反之,做多。 + + **信号列表:** + + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_底背驰_第1次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_底背驰_第2次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_底背驰_第3次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_顶背驰_第1次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_顶背驰_第2次_任意_0') + - Signal('60分钟_D1N20柱子背驰_BS辅助V240307_顶背驰_第3次_任意_0') + + :param c: CZSC对象 + :param kwargs: 无 + :return: 信号识别结果 + """ + di = int(kwargs.get('di', 1)) + n = int(kwargs.get('n', 20)) + + freq = c.freq.value + k1, k2, k3 = f"{freq}_D{di}N{n}柱子背驰_BS辅助V240307".split('_') + v1, v2 = '其他', '其他' + cache_key = update_macd_cache(c) + if len(c.bars_raw) < 7 + n: + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1) + + bars = get_sub_elements(c.bars_raw, di=di, n=n) + macd = [x.cache[cache_key]['macd'] for x in bars] + n = len(macd) + + # 计算 MACD 柱子的顶和底序列 + gs = [i for i in range(1, n - 1) if macd[i - 1] < macd[i] > macd[i + 1] and macd[i] > 0] + ds = [i for i in range(1, n - 1) if macd[i - 1] > macd[i] < macd[i + 1] and macd[i] < 0] + + if macd[-1] > 0 and len(gs) >= 2 and macd[gs[-1]] < macd[gs[-2]] and gs[-1] - gs[-2] > 2: + macd_sub = macd[gs[-2]:] + # 两个顶之间的柱子没有出现大的负值 + if abs(np.sum([x for x in macd_sub if x < 0])) < np.std(np.abs(macd_sub)): + v1 = '顶背驰' + v2 = f"第{n - gs[-1] - 1}次" + + if macd[-1] < 0 and len(ds) >= 2 and macd[ds[-1]] > macd[ds[-2]] and ds[-1] - ds[-2] > 2: + macd_sub = macd[ds[-2]:] + # 两个底之间的柱子没有出现大的正值 + if abs(np.sum([x for x in macd_sub if x > 0])) < np.std(np.abs(macd_sub)): + v1 = '底背驰' + v2 = f"第{n - ds[-1] - 1}次" + + return create_single_signal(k1=k1, k2=k2, k3=k3, v1=v1, v2=v2) + + +def check(): + from czsc.connectors import research + from czsc.traders.base import check_signals_acc + + symbols = research.get_symbols('A股主要指数') + bars = research.get_raw_bars(symbols[0], '15分钟', '20181101', '20210101', fq='前复权') + + signals_config = [{'name': tas_macd_bc_V240307, 'freq': "60分钟"}] + check_signals_acc(bars, signals_config=signals_config, height='780px', delta_days=5) # type: ignore + + +if __name__ == '__main__': + check() diff --git a/test/test_bar_generator.py b/test/test_bar_generator.py index 270e55563..e89f13993 100644 --- a/test/test_bar_generator.py +++ b/test/test_bar_generator.py @@ -12,7 +12,7 @@ def test_check_freq_and_market(): time_seq = ['11:00', '15:00', '23:00', '01:00', '02:30'] - assert check_freq_and_market(time_seq) == ('120分钟', '期货') + assert check_freq_and_market(time_seq, freq='120分钟') == ('120分钟', '期货') time_seq = [ '09:31', @@ -257,8 +257,9 @@ def test_check_freq_and_market(): ] assert check_freq_and_market(time_seq, freq='1分钟') == ('1分钟', 'A股') - for key, values in freq_market_times.items(): - assert check_freq_and_market(values) == (key.split("_")[0], key.split("_")[1]) + for key, time_seq in freq_market_times.items(): + freq, market = key.split("_") + assert check_freq_and_market(time_seq, freq) == (freq, market) def test_freq_end_time(): diff --git a/test/test_features.py b/test/test_features.py index c7aed09b6..1deed1999 100644 --- a/test/test_features.py +++ b/test/test_features.py @@ -37,3 +37,37 @@ def test_rolling_tanh(): result_df = rolling_tanh(df, 'col1', new_col='col1_tanh3', window=100, min_periods=50) assert 'col1_tanh3' in result_df.columns assert result_df['col1_tanh3'].between(-1, 1).all() + + +def test_normalize_corr(): + from czsc.features.utils import normalize_corr + + np.random.seed(123) + # Create a fake DataFrame + df = pd.DataFrame({ + 'dt': pd.date_range(start='1/1/2021', periods=3000), + 'symbol': ['AAPL'] * 3000, + 'price': np.random.rand(3000), + 'factor': np.random.rand(3000), + }) + + df['n1b'] = df['price'].shift(-1) / df['price'] - 1 + raw_corr = df['n1b'].corr(df['factor']) + + # Call the function with the fake DataFrame + result = normalize_corr(df, fcol='factor', copy=True, mode='rolling', window=600) + corr1 = result['n1b'].corr(result['factor']) + assert result.shape == df.shape and np.sign(corr1) == -np.sign(raw_corr) + + # Call the function with the fake DataFrame + result = normalize_corr(df, fcol='factor', copy=True, mode='rolling', window=300) + corr1 = result['n1b'].corr(result['factor']) + assert result.shape == df.shape and np.sign(corr1) == np.sign(raw_corr) + + result = normalize_corr(df, fcol='factor', copy=True, mode='rolling', window=2000) + corr1 = result['n1b'].corr(result['factor']) + assert result.shape == df.shape and np.sign(corr1) == -np.sign(raw_corr) + + result = normalize_corr(df, fcol='factor', copy=True, mode='simple') + corr2 = result['n1b'].corr(result['factor']) + assert result.shape == df.shape and corr2 == -raw_corr