Skip to content

Commit

Permalink
Merge pull request alpacahq#231 from alpacahq/data_v2
Browse files Browse the repository at this point in the history
Update to use data v2 stream.
  • Loading branch information
camelpac authored Jun 26, 2021
2 parents a4908ab + 8055f33 commit 46cdac0
Show file tree
Hide file tree
Showing 8 changed files with 49 additions and 312 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,8 @@ $ cat config.yaml
key_id: BROKER_API_KEY
secret: BROKER_SECRET
base_url: https://paper-api.alpaca.markets
use_polygon: false
feed: iex # <== change to pro if you have a paid account
```
*note: we use the alpaca data api by default, you could change that by
setting usePolygon to true.<br>
### Usage with redis

If you are running pylivetrader in an environment with an ephemeral file store and need your context
Expand Down
4 changes: 1 addition & 3 deletions examples/MACD/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,8 @@ if you haven't done that just set them in a config.yaml file that contains:
key_id: <YOUR-API-KEY>
secret: <YOUR-SECRET-KEY>
base_url: https://paper-api.alpaca.markets
use_polygon: false
feed: iex
```
*note: we use the alpaca data api by default, you could change that by
setting usePolygon to true.<br>
execute it like so: `pylivetrader run macd_example.py
--backend-config config.yaml`

Expand Down
4 changes: 1 addition & 3 deletions examples/using_pipeline_live/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,8 @@ if you haven't done that just set them in a `config.yaml` file that contains:
key_id: <YOUR-API-KEY>
secret: <YOUR-SECRET-KEY>
base_url: https://paper-api.alpaca.markets
use_polygon: false
feed: iex
```
*note: we use the alpaca data api by default, you could change that by
setting usePolygon to true.<br>
execute it like so: `pylivetrader run pipeline_live_example.py
--backend-config config.yaml`

Expand Down
2 changes: 0 additions & 2 deletions examples/using_pipeline_live/pipeline_live_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
import pandas as pd
from pipeline_live.data.alpaca.factors import AverageDollarVolume
from pipeline_live.data.alpaca.pricing import USEquityPricing
from pipeline_live.data.polygon.fundamentals import PolygonCompany
from zipline.pipeline import Pipeline
from logbook import Logger

Expand All @@ -21,7 +20,6 @@ def initialize(context):
top5 = AverageDollarVolume(window_length=20).top(5)
pipe = Pipeline({
'close': USEquityPricing.close.latest,
'marketcap': PolygonCompany.marketcap.latest,
}, screen=top5)

# this line connects the pipeline to pylivetrader. this is done once,
Expand Down
115 changes: 44 additions & 71 deletions pylivetrader/backend/alpaca.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os

import alpaca_trade_api as tradeapi
from alpaca_trade_api import Stream
from alpaca_trade_api.rest import APIError
from alpaca_trade_api.entity import Order
from requests.exceptions import HTTPError
Expand Down Expand Up @@ -101,12 +101,14 @@ def __init__(
secret=None,
base_url=None,
api_version='v2',
use_polygon=False
feed='iex'

):
self._key_id = key_id
self._secret = secret
self._base_url = base_url
self._use_polygon = use_polygon
self._feed = feed

self._api = tradeapi.REST(
key_id, secret, base_url, api_version=api_version
)
Expand All @@ -131,19 +133,7 @@ def initialize_data(self, context):
self._open_orders[k] = v

def _get_stream(self, context):
set_context(context)
asyncio.set_event_loop(asyncio.new_event_loop())
conn = tradeapi.StreamConn(
self._key_id,
self._secret,
self._base_url,
data_url=os.environ.get("DATA_PROXY_WS", ''),
data_stream='polygon' if self._use_polygon else 'alpacadatav1'
)
channels = ['trade_updates']

@conn.on(r'trade_updates')
async def handle_trade_update(conn, channel, data):
async def handle_trade_update(data):
# Check for any pending orders
waiting_order = self._orders_pending_submission.get(
data.order['client_order_id']
Expand All @@ -167,9 +157,19 @@ async def handle_trade_update(conn, channel, data):
self._open_orders[data.order['client_order_id']] = (
self._order2zp(Order(data.order))
)

set_context(context)
asyncio.set_event_loop(asyncio.new_event_loop())
conn = Stream(self._key_id,
self._secret,
self._base_url,
data_feed=self._feed)

conn.subscribe_trade_updates(handle_trade_update)

while 1:
try:
conn.run(channels)
conn.run()
log.info("Connection reestablished")
except Exception:
from time import sleep
Expand Down Expand Up @@ -446,10 +446,7 @@ def cancel_order(self, zp_order_id):
return

def get_last_traded_dt(self, asset):
if self._use_polygon:
trade = self._api.polygon.last_trade(asset.symbol)
else:
trade = self._api.get_last_trade(asset.symbol)
trade = self._api.get_last_trade(asset.symbol)
return trade.timestamp

def get_spot_value(
Expand Down Expand Up @@ -494,18 +491,12 @@ def _get_symbols_last_trade_value(self, symbols):
"""
Query last_trade in parallel for multiple symbols and
return in dict.
symbols: list[str]
return: dict[str -> polygon.Trade or alpaca.Trade]
"""

@skip_http_error((404, 504))
def fetch(symbol):
if self._use_polygon:
return self._api.polygon.last_trade(symbol)
else:
return self._api.get_last_trade(symbol)
return self._api.get_last_trade(symbol)

return parallelize(fetch)(symbols)

Expand Down Expand Up @@ -551,6 +542,11 @@ def get_bars(self, assets, data_frequency, bar_count=500, end_dt=None):
else {assets.symbol: assets}
df.columns = df.columns.set_levels([
symbol_asset[s] for s in df.columns.levels[0]], level=0)
# try:
# df.columns = df.columns.set_levels([
# symbol_asset[s] for s in df.columns.levels[0]], level=0)
# except:
# pass
return df

def _fetch_bars_from_api(
Expand Down Expand Up @@ -589,33 +585,21 @@ def _fetch_bars_from_api(

if not (_from and to):
_from, to = self._get_from_and_to(size, limit, end_dt=to)
if self._use_polygon:
args = [{'symbols': symbol,
'_from': _from,
"to": to,
"size": size}
for symbol in symbols]
result = parallelize(self._fetch_bars_from_api_internal)(args)
if [df for df in result.values() if isinstance(df, pd.DataFrame)]:
return pd.concat(result.values(), axis=1)
else:
return pd.DataFrame([])

else:
# alpaca support get real-time data of multi stocks(<200) at once
parts = []
for i in range(0, len(symbols), ALPACA_MAX_SYMBOLS_PER_REQUEST):
part = symbols[i:i + ALPACA_MAX_SYMBOLS_PER_REQUEST]
parts.append(part)
args = [{'symbols': part,
'_from': _from,
"to": to,
"size": size,
"limit": limit} for part in parts]
result = parallelize_with_multi_process(
self._fetch_bars_from_api_internal)(args)

return pd.concat(result, axis=1)
# alpaca support get real-time data of multi stocks(<200) at once
parts = []
for i in range(0, len(symbols), ALPACA_MAX_SYMBOLS_PER_REQUEST):
part = symbols[i:i + ALPACA_MAX_SYMBOLS_PER_REQUEST]
parts.append(part)
args = [{'symbols': part,
'_from': _from,
"to": to,
"size": size,
"limit": limit} for part in parts]
# result2 = parallelize(self._fetch_bars_from_api_internal)(args)
result = parallelize_with_multi_process(
self._fetch_bars_from_api_internal)(args)

return pd.concat(result, axis=1)

def _get_from_and_to(self, size, limit, end_dt=None):
"""
Expand Down Expand Up @@ -661,22 +645,11 @@ def wrapper():
_from = params['_from']
to = params['to']
size = params['size']
if self._use_polygon:
assert isinstance(symbols, str)
symbol = str(symbols)
df = self._api.polygon.historic_agg_v2(
symbol, 1, size,
int(_from.timestamp()) * 1000,
int(to.timestamp()) * 1000
).df
df.columns = pd.MultiIndex.from_product([[symbols, ],
df.columns])
else:
df = self._api.get_barset(symbols,
size,
limit=params['limit'],
start=_from.isoformat(),
end=to.isoformat()).df[symbols]
df = self._api.get_barset(symbols,
size,
limit=params['limit'],
start=_from.isoformat(),
end=to.isoformat()).df[symbols]

# zipline -> right label
# API result -> left label (beginning of bucket)
Expand Down
1 change: 0 additions & 1 deletion pylivetrader/misc/migration_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ def add_pipelinelive_imports(code: str) -> str:
imports = """
from pipeline_live.data.alpaca.factors import AverageDollarVolume
from pipeline_live.data.alpaca.pricing import USEquityPricing
from pipeline_live.data.polygon.fundamentals import PolygonCompany
from zipline.pipeline import Pipeline
"""
if "pipeline" in code:
Expand Down
4 changes: 2 additions & 2 deletions requirements/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
statsmodels==0.11.1 # pyup: ignore # limit to work properly with zipline 1.3.0
scipy<1.6.0 # pyup: ignore - requires python >= 3.7
numpy<=1.19.4
numpy<=1.19.5 # pyup: ignore - requires python >= 3.7
pipeline-live>=0.1.12
bottleneck>=1.3
pytz>=2020.1
Expand All @@ -10,6 +10,6 @@ trading_calendars>=1.11
click>=7,<8
PyYAML>=5, <6
ipython>=7
alpaca-trade-api>=1.2.0
alpaca-trade-api>=1.2.2
pandas>=0.18.1, <=0.22.0 # pyup: ignore # limit to work properly with zipline 1.3.0
pandas-datareader<=0.8.1 # pyup: ignore # higher requires pandas>=0.23, zipline limits to 0.22
Loading

0 comments on commit 46cdac0

Please sign in to comment.