Skip to content

Commit

Permalink
1. 并行不区分os和win都不使用joblib了
Browse files Browse the repository at this point in the history
2. 进程中只有一个symbol回测进度条显示具体回测进度
3. 期货类添加对应查询
4. 对象序列化ump使用通用协议进行保存
5. ix to ioc
  • Loading branch information
bbfamily committed Sep 10, 2017
1 parent 179a267 commit d111c2d
Show file tree
Hide file tree
Showing 42 changed files with 722 additions and 1,485 deletions.
23 changes: 15 additions & 8 deletions abupy/AlphaBu/ABuPickTimeExecute.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ class EFitError(Enum):


def _do_pick_time_work(capital, buy_factors, sell_factors, kl_pd, benchmark, draw=False,
show_info=False):
show_info=False, show_pg=False):
"""
内部方法:包装AbuPickTimeWorker进行fit,分配错误码,通过trade_summary生成orders_pd,action_pd
:param capital: AbuCapital实例对象
Expand All @@ -54,20 +54,24 @@ def _do_pick_time_work(capital, buy_factors, sell_factors, kl_pd, benchmark, dra
:param benchmark: 交易基准对象,AbuBenchmark实例对象
:param draw: 是否绘制在对应的金融时间序列上的交易行为
:param show_info: 是否显示在整个金融时间序列上的交易结果
:param show_pg: 是否择时内部启动进度条,适合单进程或者每个进程里只有一个symbol进行择时
:return:
"""
if kl_pd is None or kl_pd.shape[0] == 0:
return None, EFitError.NET_ERROR

abu = AbuPickTimeWorker(capital, kl_pd, benchmark, buy_factors, sell_factors)
abu.fit()
pick_timer_worker = AbuPickTimeWorker(capital, kl_pd, benchmark, buy_factors, sell_factors)
if show_pg:
pick_timer_worker.enable_task_pg()
pick_timer_worker.fit()

if len(abu.orders) == 0:
if len(pick_timer_worker.orders) == 0:
# 择时金融时间序列拟合操作后,没有任何order生成
return None, EFitError.NO_ORDER_GEN

# 生成关键的orders_pd与action_pd
orders_pd, action_pd, _ = ABuTradeProxy.trade_summary(abu.orders, kl_pd, draw=draw, show_info=show_info)
orders_pd, action_pd, _ = ABuTradeProxy.trade_summary(pick_timer_worker.orders, kl_pd, draw=draw,
show_info=show_info)

# 最后生成list是因为tuple无法修改导致之后不能灵活处理
return [orders_pd, action_pd], EFitError.FIT_OK
Expand Down Expand Up @@ -101,16 +105,19 @@ def _batch_symbols_with_same_factors(p_buy_factors, p_sell_factors):
# 启动多进程进度显示AbuMulPidProgress
with AbuMulPidProgress(len(target_symbols), 'pick times complete') as progress:
for epoch, target_symbol in enumerate(target_symbols):
# 如果要绘制交易细节就不要clear了
progress.show(epoch + 1, clear=not show)

# 如果symbol只有一个就不show了,留给下面_do_pick_time_work中show_pg内部显示进度
if len(target_symbols) > 1:
# 如果要绘制交易细节就不要clear了
progress.show(epoch + 1, clear=not show)

if func_factors is not None and callable(func_factors):
# 针对do_symbols_with_diff_factors mul factors等情况嵌入可变因子
p_buy_factors, p_sell_factors = func_factors(target_symbol)
try:
kl_pd = kl_pd_manager.get_pick_time_kl_pd(target_symbol)
ret, fit_error = _do_pick_time_work(capital, p_buy_factors, p_sell_factors, kl_pd, benchmark,
draw=show, show_info=show)
draw=show, show_info=show, show_pg=len(target_symbols) == 1)
except Exception as e:
logging.exception(e)
continue
Expand Down
16 changes: 16 additions & 0 deletions abupy/AlphaBu/ABuPickTimeWorker.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from .ABuPickBase import AbuPickTimeWorkBase
# noinspection PyUnresolvedReferences
from ..CoreBu.ABuFixes import filter
from ..UtilBu.ABuProgress import AbuMulPidProgress

__author__ = '阿布'
__weixin__ = 'abu_quant'
Expand Down Expand Up @@ -60,13 +61,22 @@ def __init__(self, cap, kl_pd, benchmark, buy_factors, sell_factors):
self.filter_long_task_factors()
# 择时最终买入卖出行为列表,列表中每一个对象都为AbuOrder对象
self.orders = list()
# 择时进度条,默认空, 即不打开,不显示择时进度
self.task_pg = None

def __str__(self):
"""打印对象显示:买入因子列表+卖出因子列表"""
return 'buy_factors:{}\nsell_factors:{}'.format(self.buy_factors, self.sell_factors)

__repr__ = __str__

def enable_task_pg(self):
"""启动择时内部任务进度条"""
if self.kl_pd is not None and hasattr(self.kl_pd, 'name') and len(self.kl_pd) > 120:
self.task_pg = AbuMulPidProgress(len(self.kl_pd), 'pick {} times'.format(self.kl_pd.name))
self.task_pg.init_ui_progress()
self.task_pg.display_step = 42

def _week_task(self, today):
"""
周任务:使用self.week_buy_factors,self.week_sell_factors进行迭代
Expand Down Expand Up @@ -116,6 +126,9 @@ def _task_loop(self, today):
:param today: 对self.kl_pd apply操作,且axis=1结果为一天的交易数据
:return:
"""
if self.task_pg is not None:
self.task_pg.show()

day_cnt = today.key
# 判断是否执行周任务
exec_week = today.week_task == 1 if g_natural_long_task else day_cnt % 5 == 0
Expand Down Expand Up @@ -164,6 +177,9 @@ def fit(self, *args, **kwargs):
# 通过pandas apply进行交易日递进择时
self.kl_pd.apply(self._task_loop, axis=1)

if self.task_pg is not None:
self.task_pg.close_ui_progress()

def init_sell_factors(self, sell_factors):
"""
通过sell_factors实例化各个卖出因子
Expand Down
8 changes: 7 additions & 1 deletion abupy/CoreBu/ABu.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ..TradeBu.ABuCapital import AbuCapital
from ..TradeBu.ABuKLManager import AbuKLManager
from ..UtilBu import ABuDateUtil
from ..UtilBu import ABuProgress

__author__ = '阿布'
__weixin__ = 'abu_quant'
Expand Down Expand Up @@ -97,18 +98,23 @@ def free_commission(trade_cnt, price):
if choice_symbols is None or len(choice_symbols) == 0:
logging.info('pick stock result is zero!')
return None, None

# kl数据管理类初始化
kl_pd_manager = AbuKLManager(benchmark, capital)
# 批量获取择时kl数据
kl_pd_manager.batch_get_pick_time_kl_pd(choice_symbols, n_process=n_process_kl)

# 在择时之前清理一下输出, 不能wait, windows上一些浏览器会卡死
ABuProgress.do_clear_output(wait=False)

# 择时策略运行,多进程方式
orders_pd, action_pd, all_fit_symbols_cnt = AbuPickTimeMaster.do_symbols_with_same_factors_process(
choice_symbols, benchmark,
buy_factors, sell_factors, capital, kl_pd_manager=kl_pd_manager, n_process_kl=n_process_kl,
n_process_pick_time=n_process_pick)

# 都完事时检测一下还有没有ui进度条
ABuProgress.do_check_process_is_dead()

# 返回namedtuple, ('orders_pd', 'action_pd', 'capital', 'benchmark')
abu_result = AbuResultTuple(orders_pd, action_pd, capital, benchmark)
# store_abu_result_tuple(abu_result, n_folds)
Expand Down
10 changes: 6 additions & 4 deletions abupy/CoreBu/ABuEnv.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
try:
# noinspection PyUnresolvedReferences
import psutil

"""有psutil,使用psutil.cpu_count计算cpu个数"""
g_cpu_cnt = psutil.cpu_count(logical=True) * 1
except ImportError:
Expand All @@ -50,6 +51,7 @@
g_cpu_cnt = os.cpu_count()
else:
import multiprocessing as mp

g_cpu_cnt = mp.cpu_count()
except:
"""cpu个数"""
Expand Down Expand Up @@ -78,9 +80,11 @@
# noinspection PyBroadException
try:
import matplotlib

matplotlib.warnings.filterwarnings('ignore')
matplotlib.warnings.simplefilter('ignore')
import sklearn

sklearn.warnings.filterwarnings('ignore')
sklearn.warnings.simplefilter('ignore')
except:
Expand All @@ -101,7 +105,6 @@
"""
root_drive = path.expanduser('~')


"""abu数据缓存主目录文件夹"""
g_project_root = path.join(root_drive, 'abu')
"""abu数据文件夹 ~/abu/data"""
Expand All @@ -115,7 +118,6 @@
"""abu项目数据主文件目录,即项目中的RomDataBu位置"""
g_project_rom_data_dir = path.join(path.dirname(path.abspath(path.realpath(__file__))), '../RomDataBu')


"""abu日志文件 ~/abu/log/info.log"""
g_project_log_info = path.join(g_project_log_dir, 'info.log')

Expand All @@ -137,6 +139,7 @@
"""
g_crawl_chrome_driver = None


# ******************** CrawlBu start ****************


Expand All @@ -162,6 +165,7 @@ class EMarketSourceType(Enum):
"""火币 比特币,莱特币"""
E_MARKET_SOURCE_hb_tc = 200


"""默认设置数据源使用E_MARKET_SOURCE_bd"""
g_market_source = EMarketSourceType.E_MARKET_SOURCE_bd

Expand Down Expand Up @@ -210,7 +214,6 @@ class EMarketSubType(Enum):
"""未上市"""
US_PREIPO = 'PREIPO'


"""港股hk"""
HK = 'hk'

Expand Down Expand Up @@ -349,7 +352,6 @@ class EDataCacheType(Enum):
"""对外版本由于用户电脑性能,存储空间且winodws用户,python2用户多,所以更改默认存储类型为csv"""
g_data_cache_type = EDataCacheType.E_DATA_CACHE_CSV


"""csv模式下的存储路径"""
g_project_kl_df_data_csv = path.join(g_project_data_dir, 'csv')

Expand Down
7 changes: 6 additions & 1 deletion abupy/CoreBu/ABuParallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
__author__ = '阿布'
__weixin__ = 'abu_quant'

if ABuEnv.g_is_mac_os:
# if ABuEnv.g_is_mac_os:
if False:
"""
对外版本不再使用joblib避免python2,python3,mac, windows等joblib最后的pop一直出不来的bug,
有能力可自行打开,在mac下python3环境且cpu足够快建议打开
"""
# MAC 直接import Parallel, delayed
# noinspection PyUnresolvedReferences
from ..ExtBu.joblib import Parallel, delayed
Expand Down
3 changes: 0 additions & 3 deletions abupy/FactorBuyBu/ABuFactorBuyDemo.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,6 @@ def _init_self(self, **kwargs):
self.btc_similar_top = kwargs.pop('btc_similar_top')
# 超过多少个相关股票今天趋势相同就买入
self.btc_vote_val = kwargs.pop('btc_vote_val', 0.60)
self.pg = AbuProgress(len(self.kl_pd), 0, 'btc buy day')

def _collect_kl(sim_line):
"""在初始化中将所有相关股票的对应时间的k线数据进行收集"""
Expand All @@ -222,8 +221,6 @@ def fit_day(self, today):
:param today: 当前驱动的交易日金融时间序列数据
:return:
"""
self.pg.show()

# 忽略不符合买入的天(统计周期内前两天, 因为btc的机器学习特证需要三天交易数据)
if self.today_ind < 2:
return None
Expand Down
9 changes: 9 additions & 0 deletions abupy/MarketBu/ABuSymbolFutures.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ def __getitem__(self, key):
if key in self:
return self.futures_cn_df[key]
# 不在的话,返回整个表格futures_cn_df

symbol_df = self.futures_cn_df[self.futures_cn_df.symbol == key]
if not symbol_df.empty:
return symbol_df
return self.futures_cn_df

def __setitem__(self, key, value):
Expand Down Expand Up @@ -187,6 +191,11 @@ def __getitem__(self, key):
"""索引获取:套接self.futures_cn_df[key]"""
if key in self:
return self.futures_gb_df[key]

symbol_df = self.futures_gb_df[self.futures_gb_df.symbol == key]
if not symbol_df.empty:
return symbol_df

# 不在的话,返回整个表格futures_cn_df
return self.futures_gb_df

Expand Down
4 changes: 2 additions & 2 deletions abupy/SimilarBu/ABuSimilar.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ def _handle_market_change_df(market_change_df, cmp_cnt, benchmark_df, show_cnt,
:return:
"""
# 使用[-cmp_cnt:]再次确定时间序列周期
benchmark_df = benchmark_df.ix[-cmp_cnt:]
market_change_df = market_change_df.ix[-cmp_cnt:]
benchmark_df = benchmark_df.iloc[-cmp_cnt:]
market_change_df = market_change_df.iloc[-cmp_cnt:]

if corr_type == ECoreCorrType.E_CORE_TYPE_ROLLING:
# 把参数时间加权rolling和corr_type设置进行merge
Expand Down
4 changes: 2 additions & 2 deletions abupy/TradeBu/ABuTradeProxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,14 +223,14 @@ def _difference(order):

def find_unique_group_symbol(order_pd):
"""
按照'buy_date', 'symbol'分组后,只筛选组里的第一个same_group.ix[0]
按照'buy_date', 'symbol'分组后,只筛选组里的第一个same_group.iloc[0]
:param order_pd:
:return:
"""

def _find_unique_group_symbol(same_group):
# 只筛选组里的第一个, 即同一个交易日,对一个股票的交易只保留一个order
return same_group.ix[0]
return same_group.iloc[0]

# 按照'buy_date', 'symbol'分组后apply same_handle
order_pds = order_pd.groupby(['buy_date', 'symbol']).apply(_find_unique_group_symbol)
Expand Down
2 changes: 1 addition & 1 deletion abupy/UmpBu/ABuUmpEdgeBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ def dump_clf(self):
[ 1.445, 16.266, 4.615, -1.115],
[ 1.445, 16.266, 4.615, -1.115]])
"""
ABuFileUtil.dump_pickle(df_x_dict, self.dump_file_fn())
ABuFileUtil.dump_pickle(df_x_dict, self.dump_file_fn(), how='zero')

def predict(self, **kwargs):
"""
Expand Down
2 changes: 1 addition & 1 deletion abupy/UmpBu/ABuUmpMainBase.py
Original file line number Diff line number Diff line change
Expand Up @@ -973,7 +973,7 @@ def dump_clf(self, llps=None):
}
"""
# 通过ABuFileUtil.dump_pickle将clf_cluster_dict进行序列化
ABuFileUtil.dump_pickle(clf_cluster_dict, self.dump_file_fn())
ABuFileUtil.dump_pickle(clf_cluster_dict, self.dump_file_fn(), how='zero')

def predict(self, x, need_hit_cnt=1):
"""
Expand Down
9 changes: 6 additions & 3 deletions abupy/UtilBu/ABuFileUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,22 +119,25 @@ def load_pickle(file_name):
return ret


def dump_pickle(input_obj, file_name):
def dump_pickle(input_obj, file_name, how='normal'):
"""
存贮python序列化的本地文件
:param input_obj: 需要进行序列化的对象
:param file_name: 文件名,str对象, 相对路径或者绝对路径
:param how: 序列化协议选择,默认normal不特殊处理,
zero使用python2, python3协议兼容模式,使用protocol=0,
high使用支持的最高协议
"""
ensure_dir(file_name)

print('please wait! dump_pickle....:', file_name)

try:
with open(file_name, "wb") as pick_file:
if K_SET_PICKLE_HIGHEST_PROTOCOL:
if K_SET_PICKLE_HIGHEST_PROTOCOL or how == 'high':
"""使用所支持的最高协议进行dump"""
pickle.dump(input_obj, pick_file, pickle.HIGHEST_PROTOCOL)
elif K_SET_PICKLE_ZERO_PROTOCOL:
elif K_SET_PICKLE_ZERO_PROTOCOL or how == 'zero':
"""python2, python3协议兼容模式,使用protocol=0"""
pickle.dump(input_obj, pick_file, 0)
else:
Expand Down
6 changes: 4 additions & 2 deletions abupy/UtilBu/ABuOsUtil.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@


@catch_error()
def show_msg(title, msg):
def show_msg(title, msg, log=True):
"""
统一平台弹窗信息提示,被catch_error装饰,即不应该被提示中断程序,
特别长任务的情况下
:param title: 弹窗信息标题
:param msg: 弹窗信息内容
:param log: 是否通过logging.info打印信息
:return:
"""
# 由于catch_error忽略错误,所有主要信息还是先打印
log_func('title: {} msg: {}'.format(title, msg))
if log:
log_func('title: {} msg: {}'.format(title, msg))
if ABuEnv.g_is_mac_os:
from ..UtilBu.ABuMacUtil import show_msg as do_show_msg
else:
Expand Down
Loading

0 comments on commit d111c2d

Please sign in to comment.