forked from aspromatis/zipline_bundle
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcsvdir.py
240 lines (197 loc) · 8.65 KB
/
csvdir.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
"""
Module for building a complete dataset from local directory with csv files.
"""
import os
import sys
from logbook import Logger, StreamHandler
from numpy import empty
from pandas import DataFrame, read_csv, Index, Timedelta, NaT
from trading_calendars import register_calendar_alias
from zipline.utils.cli import maybe_show_progress
from . import core as bundles
handler = StreamHandler(sys.stdout, format_string=" | {record.message}")
logger = Logger(__name__)
logger.handlers.append(handler)
def csvdir_equities(tframes=None, csvdir=None):
"""
Generate an ingest function for custom data bundle
This function can be used in ~/.zipline/extension.py
to register bundle with custom parameters, e.g. with
a custom trading calendar.
Parameters
----------
tframes: tuple, optional
The data time frames, supported timeframes: 'daily' and 'minute'
csvdir : string, optional, default: CSVDIR environment variable
The path to the directory of this structure:
<directory>/<timeframe1>/<symbol1>.csv
<directory>/<timeframe1>/<symbol2>.csv
<directory>/<timeframe1>/<symbol3>.csv
<directory>/<timeframe2>/<symbol1>.csv
<directory>/<timeframe2>/<symbol2>.csv
<directory>/<timeframe2>/<symbol3>.csv
Returns
-------
ingest : callable
The bundle ingest function
Examples
--------
This code should be added to ~/.zipline/extension.py
.. code-block:: python
from zipline.data.bundles import csvdir_equities, register
register('custom-csvdir-bundle',
csvdir_equities(["daily", "minute"],
'/full/path/to/the/csvdir/directory'))
"""
return CSVDIRBundle(tframes, csvdir).ingest
class CSVDIRBundle:
"""
Wrapper class to call csvdir_bundle with provided
list of time frames and a path to the csvdir directory
"""
def __init__(self, tframes=None, csvdir=None):
self.tframes = tframes
self.csvdir = csvdir
def ingest(self,
environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir):
csvdir_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
self.tframes,
self.csvdir)
@bundles.register("csvdir")
def csvdir_bundle(environ,
asset_db_writer,
minute_bar_writer,
daily_bar_writer,
adjustment_writer,
calendar,
start_session,
end_session,
cache,
show_progress,
output_dir,
tframes=None,
csvdir=None):
"""
Build a zipline data bundle from the directory with csv files.
"""
if not csvdir:
csvdir = environ.get('CSVDIR')
if not csvdir:
raise ValueError("CSVDIR environment variable is not set")
if not os.path.isdir(csvdir):
raise ValueError("%s is not a directory" % csvdir)
if not tframes:
tframes = set(["daily", "minute"]).intersection(os.listdir(csvdir))
if not tframes:
raise ValueError("'daily' and 'minute' directories "
"not found in '%s'" % csvdir)
divs_splits = {'divs': DataFrame(columns=['sid', 'amount',
'ex_date', 'record_date',
'declared_date', 'pay_date']),
'splits': DataFrame(columns=['sid', 'ratio',
'effective_date'])}
for tframe in tframes:
ddir = os.path.join(csvdir, tframe)
symbols = sorted(item.split('.csv')[0]
for item in os.listdir(ddir)
if '.csv' in item)
if not symbols:
raise ValueError("no <symbol>.csv* files found in %s" % ddir)
dtype = [('start_date', 'datetime64[ns]'),
('end_date', 'datetime64[ns]'),
('auto_close_date', 'datetime64[ns]'),
('symbol', 'object')]
metadata = DataFrame(empty(len(symbols), dtype=dtype))
if tframe == 'minute':
writer = minute_bar_writer
else:
writer = daily_bar_writer
writer.write(_pricing_iter(ddir, symbols, metadata,
divs_splits, show_progress),
show_progress=show_progress)
# Hardcode the exchange to "CSVDIR" for all assets and (elsewhere)
# register "CSVDIR" to resolve to the NYSE calendar, because these
# are all equities and thus can use the NYSE calendar.
metadata['exchange'] = "CSVDIR"
asset_db_writer.write(equities=metadata)
divs_splits['divs']['sid'] = divs_splits['divs']['sid'].astype(int)
divs_splits['splits']['sid'] = divs_splits['splits']['sid'].astype(int)
adjustment_writer.write(splits=divs_splits['splits'],
dividends=divs_splits['divs'])
def _pricing_iter(csvdir, symbols, metadata, divs_splits, show_progress):
with maybe_show_progress(symbols, show_progress,
label='Loading custom pricing data: ') as it:
files = os.listdir(csvdir)
# print(files) # Erol debug added
for sid, symbol in enumerate(it):
logger.debug('%s: sid %s' % (symbol, sid))
# print(sid) # Erol debug added
# print(symbol) # Erol debug added
try:
fname = [fname for fname in files
# if '%s.csv' % symbol in fname][0] # Erol: it looks like this is what is f3$%ing it up
if '%s.csv' % symbol == fname][0] # Erol: I fixed it here
# print(fname) # Erol debug added
except IndexError:
raise ValueError("%s.csv file is not in %s" % (symbol, csvdir))
dfr = read_csv(os.path.join(csvdir, fname),
parse_dates=[0],
infer_datetime_format=True,
index_col=0).sort_index()
start_date = dfr.index[0]
end_date = dfr.index[-1]
# Erol added this to sync to the official trading calendar, I'll probably just handle this on the dataside
# Check valid trading dates, according to the selected exchange calendar
# sessions = calendar.sessions_in_range(start_session, end_session)
# dfr = dfr.reindex(sessions.tz_localize(None))[start_date:end_date]
# The auto_close date is the day after the last trade.
ac_date = end_date + Timedelta(days=1)
metadata.iloc[sid] = start_date, end_date, ac_date, symbol
if 'split' in dfr.columns:
tmp = 1. / dfr[dfr['split'] != 1.0]['split']
split = DataFrame(data=tmp.index.tolist(),
columns=['effective_date'])
split['ratio'] = tmp.tolist()
split['sid'] = sid
splits = divs_splits['splits']
index = Index(range(splits.shape[0],
splits.shape[0] + split.shape[0]))
split.set_index(index, inplace=True)
divs_splits['splits'] = splits.append(split)
if 'dividend' in dfr.columns:
# ex_date amount sid record_date declared_date pay_date
tmp = dfr[dfr['dividend'] != 0.0]['dividend']
div = DataFrame(data=tmp.index.tolist(), columns=['ex_date'])
div['record_date'] = NaT
div['declared_date'] = NaT
div['pay_date'] = NaT
# Erol add a pay_date - This fixed the problem
div['pay_date'] = div['ex_date']
div['amount'] = tmp.tolist()
div['sid'] = sid
divs = divs_splits['divs']
ind = Index(range(divs.shape[0], divs.shape[0] + div.shape[0]))
div.set_index(ind, inplace=True)
divs_splits['divs'] = divs.append(div)
yield sid, dfr
register_calendar_alias("CSVDIR", "NYSE")