Skip to content

Commit

Permalink
refactor(http): smarter retry logic, addl methods, bugfixes
Browse files Browse the repository at this point in the history
  • Loading branch information
STRML committed Nov 1, 2017
1 parent 162470a commit db031f4
Showing 1 changed file with 123 additions and 62 deletions.
185 changes: 123 additions & 62 deletions market_maker/bitmex.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import base64
import uuid
import logging
from market_maker.auth import AccessTokenAuth, APIKeyAuthWithExpires
from market_maker.auth import APIKeyAuthWithExpires
from market_maker.utils import constants, errors
from market_maker.ws.ws_thread import BitMEXWebsocket

Expand All @@ -17,17 +17,12 @@ class BitMEX(object):

"""BitMEX API Connector."""

def __init__(self, base_url=None, symbol=None, login=None, password=None, otpToken=None,
apiKey=None, apiSecret=None, orderIDPrefix='mm_bitmex_', shouldWSAuth=True):
def __init__(self, base_url=None, symbol=None, apiKey=None, apiSecret=None,
orderIDPrefix='mm_bitmex_', shouldWSAuth=True):
"""Init connector."""
self.logger = logging.getLogger('root')
self.base_url = base_url
self.symbol = symbol
self.token = None
# User/pass auth is no longer supported
if (login or password or otpToken):
raise Exception("User/password authentication is no longer supported via the API. Please use " +
"an API key. You can generate one at https://www.bitmex.com/app/apiKeys")
if (apiKey is None):
raise Exception("Please set an API key and Secret to get started. See " +
"https://github.com/BitMEX/sample-market-maker/#getting-started for more information."
Expand All @@ -37,6 +32,7 @@ def __init__(self, base_url=None, symbol=None, login=None, password=None, otpTok
if len(orderIDPrefix) > 13:
raise ValueError("settings.ORDERID_PREFIX must be at most 13 characters long!")
self.orderIDPrefix = orderIDPrefix
self.retries = 0 # initialize counter

# Prepare HTTPS session
self.session = requests.Session()
Expand All @@ -58,19 +54,27 @@ def exit(self):
#
# Public methods
#
def ticker_data(self, symbol):
def ticker_data(self, symbol=None):
"""Get ticker data."""
if symbol is None:
symbol = self.symbol
return self.ws.get_ticker(symbol)

def instrument(self, symbol):
"""Get an instrument's details."""
return self.ws.get_instrument(symbol)

def instruments(self, filter=None):
query = {}
if filter is not None:
query['filter'] = json.dumps(filter)
return self._curl_bitmex(path='instrument', query=query, verb='GET')

def market_depth(self, symbol):
"""Get market depth / orderbook."""
return self.ws.market_depth(symbol)

def recent_trades(self, symbol):
def recent_trades(self):
"""Get recent trades.
Returns
Expand All @@ -82,19 +86,19 @@ def recent_trades(self, symbol):
u'tid': u'93842'},
"""
return self.ws.recent_trades(symbol)
return self.ws.recent_trades()

#
# Authentication required methods
#
def authentication_required(function):
def authentication_required(fn):
"""Annotation for methods that require auth."""
def wrapped(self, *args, **kwargs):
if not (self.apiKey):
msg = "You must be authenticated to use this method"
raise errors.AuthenticationError(msg)
else:
return function(self, *args, **kwargs)
return fn(self, *args, **kwargs)
return wrapped

@authentication_required
Expand All @@ -107,6 +111,20 @@ def position(self, symbol):
"""Get your open position."""
return self.ws.position(symbol)

@authentication_required
def isolate_margin(self, symbol, leverage, rethrow_errors=False):
"""Set the leverage on an isolated margin position"""
path = "position/leverage"
postdict = {
'symbol': symbol,
'leverage': leverage
}
return self._curl_bitmex(path=path, postdict=postdict, verb="POST", rethrow_errors=rethrow_errors)

@authentication_required
def delta(self):
return self.position(self.symbol)['homeNotional']

@authentication_required
def buy(self, quantity, price):
"""Place a buy order.
Expand All @@ -131,27 +149,28 @@ def place_order(self, quantity, price):

endpoint = "order"
# Generate a unique clOrdID with our prefix so we can identify it.
clOrdID = self.orderIDPrefix + base64.b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=\n')
clOrdID = self.orderIDPrefix + base64.b64encode(uuid.uuid4().bytes).decode('utf8').rstrip('=\n')
postdict = {
'symbol': self.symbol,
'orderQty': quantity,
'price': price,
'clOrdID': clOrdID
}
return self._curl_bitmex(api=endpoint, postdict=postdict, verb="POST")
return self._curl_bitmex(path=endpoint, postdict=postdict, verb="POST")

@authentication_required
def amend_bulk_orders(self, orders):
"""Amend multiple orders."""
return self._curl_bitmex(api='order/bulk', postdict={'orders': orders}, verb='PUT')
# Note rethrow; if this fails, we want to catch it and re-tick
return self._curl_bitmex(path='order/bulk', postdict={'orders': orders}, verb='PUT', rethrow_errors=True)

@authentication_required
def create_bulk_orders(self, orders):
"""Create multiple orders."""
for order in orders:
order['clOrdID'] = self.orderIDPrefix + base64.b64encode(uuid.uuid4().bytes).decode('utf-8').rstrip('=\n')
order['clOrdID'] = self.orderIDPrefix + base64.b64encode(uuid.uuid4().bytes).decode('utf8').rstrip('=\n')
order['symbol'] = self.symbol
return self._curl_bitmex(api='order/bulk', postdict={'orders': orders}, verb='POST')
return self._curl_bitmex(path='order/bulk', postdict={'orders': orders}, verb='POST')

@authentication_required
def open_orders(self):
Expand All @@ -161,10 +180,13 @@ def open_orders(self):
@authentication_required
def http_open_orders(self):
"""Get open orders via HTTP. Used on close to ensure we catch them all."""
api = "order"
path = "order"
orders = self._curl_bitmex(
api=api,
query={'filter': json.dumps({'ordStatus.isTerminated': False, 'symbol': self.symbol})},
path=path,
query={
'filter': json.dumps({'ordStatus.isTerminated': False, 'symbol': self.symbol}),
'count': 500
},
verb="GET"
)
# Only return orders that start with our clOrdID prefix.
Expand All @@ -173,56 +195,86 @@ def http_open_orders(self):
@authentication_required
def cancel(self, orderID):
"""Cancel an existing order."""
api = "order"
path = "order"
postdict = {
'orderID': orderID,
}
return self._curl_bitmex(api=api, postdict=postdict, verb="DELETE")
return self._curl_bitmex(path=path, postdict=postdict, verb="DELETE")

@authentication_required
def withdraw(self, amount, fee, address):
api = "user/requestWithdrawal"
path = "user/requestWithdrawal"
postdict = {
'amount': amount,
'fee': fee,
'currency': 'XBt',
'address': address
}
return self._curl_bitmex(api=api, postdict=postdict, verb="POST")
return self._curl_bitmex(path=path, postdict=postdict, verb="POST", max_retries=0)

def _curl_bitmex(self, api, query=None, postdict=None, timeout=3, verb=None):
def _curl_bitmex(self, path, query=None, postdict=None, timeout=5, verb=None, rethrow_errors=False,
max_retries=None):
"""Send a request to BitMEX Servers."""
# Handle URL
url = self.base_url + api
url = self.base_url + path

# Default to POST if data is attached, GET otherwise
if not verb:
verb = 'POST' if postdict else 'GET'

# Auth: Use Access Token by default, API Key/Secret if provided
auth = AccessTokenAuth(self.token)
if self.apiKey:
auth = APIKeyAuthWithExpires(self.apiKey, self.apiSecret)
# By default don't retry POST or PUT. Retrying GET/DELETE is okay because they are idempotent.
# In the future we could allow retrying PUT, so long as 'leavesQty' is not used (not idempotent),
# or you could change the clOrdID (set {"clOrdID": "new", "origClOrdID": "old"}) so that an amend
# can't erroneously be applied twice.
if max_retries is None:
max_retries = 0 if verb in ['POST', 'PUT'] else 3

# Auth: API Key/Secret
auth = APIKeyAuthWithExpires(self.apiKey, self.apiSecret)

def exit_or_throw(e):
if rethrow_errors:
raise e
else:
exit(1)

def retry():
self.retries += 1
if self.retries > max_retries:
raise Exception("Max retries on %s (%s) hit, raising." % (path, json.dumps(postdict or '')))
return self._curl_bitmex(path, query, postdict, timeout, verb, rethrow_errors, max_retries)

# Make the request
response = None
try:
self.logger.info("sending req to %s: %s" % (url, json.dumps(postdict or query or '')))
req = requests.Request(verb, url, json=postdict, auth=auth, params=query)
prepped = self.session.prepare_request(req)
response = self.session.send(prepped, timeout=timeout)
# Make non-200s throw
response.raise_for_status()

except requests.exceptions.HTTPError as e:
# 401 - Auth error. This is fatal with API keys.
if response.status_code == 401:
self.logger.error("Login information or API Key incorrect, please check and restart.")
if response is None:
raise e

# 404, can be thrown if order canceled does not exist.
# 401 - Auth error. This is fatal.
if response.status_code == 401:
self.logger.error("API Key or Secret incorrect, please check and restart.")
self.logger.error("Error: " + response.text)
if postdict:
self.logger.error(postdict)
# Always exit, even if rethrow_errors, because this is fatal
exit(1)

# 404, can be thrown if order canceled or does not exist.
elif response.status_code == 404:
if verb == 'DELETE':
self.logger.error("Order not found: %s" % postdict['orderID'])
return
self.logger.error("Unable to contact the BitMEX API (404). ")
self.logger.error("Unable to contact the BitMEX API (404). " +
"Request: %s \n %s" % (url, json.dumps(postdict)))
exit_or_throw(e)

# 429, ratelimit; cancel orders & wait until X-Ratelimit-Reset
elif response.status_code == 429:
Expand All @@ -243,51 +295,60 @@ def _curl_bitmex(self, api, query=None, postdict=None, timeout=3, verb=None):
time.sleep(to_sleep)

# Retry the request.
return self._curl_bitmex(api, query, postdict, timeout, verb)
return retry()

# 503 - BitMEX temporary downtime, likely due to a deploy. Try again
elif response.status_code == 503:
self.logger.warning("Unable to contact the BitMEX API (503), retrying. " +
"Request: %s \n %s" % (url, json.dumps(postdict)))
time.sleep(3)
return self._curl_bitmex(api, query, postdict, timeout, verb)
return retry()

elif response.status_code == 400:
error = response.json()['error']
message = error['message'].lower()
# Duplicate clOrdID: that's fine, probably a deploy, go get the order and return it
if 'duplicate clordid' in message:
message = error['message'].lower() if error else ''

order = self._curl_bitmex('/order',
query={'filter': json.dumps({'clOrdID': postdict['clOrdID']})},
verb='GET')[0]
if (
order['orderQty'] != abs(postdict['orderQty']) or
order['side'] != ('Buy' if postdict['orderQty'] > 0 else 'Sell') or
order['price'] != postdict['price'] or
order['symbol'] != postdict['symbol']):
raise Exception('Attempted to recover from duplicate clOrdID, but order returned from API ' +
'did not match POST.\nPOST data: %s\nReturned order: %s' % (
json.dumps(postdict), json.dumps(order)))
# Duplicate clOrdID: that's fine, probably a deploy, go get the order(s) and return it
if 'duplicate clordid' in message:
orders = postdict['orders'] if 'orders' in postdict else postdict

IDs = json.dumps({'clOrdID': [order['clOrdID'] for order in orders]})
orderResults = self._curl_bitmex('/order', query={'filter': IDs}, verb='GET')

for i, order in enumerate(orderResults):
if (
order['orderQty'] != abs(postdict['orderQty']) or
order['side'] != ('Buy' if postdict['orderQty'] > 0 else 'Sell') or
order['price'] != postdict['price'] or
order['symbol'] != postdict['symbol']):
raise Exception('Attempted to recover from duplicate clOrdID, but order returned from API ' +
'did not match POST.\nPOST data: %s\nReturned order: %s' % (
json.dumps(orders[i]), json.dumps(order)))
# All good
return order
return orderResults

elif 'insufficient available balance' in message:
raise Exception('Account out of funds. The message: %s' % error['message'])
self.logger.error('Account out of funds. The message: %s' % error['message'])
exit_or_throw(Exception('Insufficient Funds'))


# If we haven't returned or re-raised yet, we get here.
self.logger.error("Error: %s: %s" % (e, response.text))
self.logger.error("Endpoint was: %s %s: %s" % (verb, api, json.dumps(postdict)))
raise e
self.logger.error("Unhandled Error: %s: %s" % (e, response.text))
self.logger.error("Endpoint was: %s %s: %s" % (verb, path, json.dumps(postdict)))
exit_or_throw(e)

except requests.exceptions.Timeout as e:
# Timeout, re-run this request
self.logger.warning("Timed out, retrying...")
return self._curl_bitmex(api, query, postdict, timeout, verb)
self.logger.warning("Timed out on request: %s (%s), retrying..." % (path, json.dumps(postdict or '')))
return retry()

except requests.exceptions.ConnectionError as e:
self.logger.warning("Unable to contact the BitMEX API (ConnectionError). Please check the URL. Retrying. " +
"Request: %s \n %s" % (url, json.dumps(postdict)))
self.logger.warning("Unable to contact the BitMEX API (%s). Please check the URL. Retrying. " +
"Request: %s %s \n %s" % (e, url, json.dumps(postdict)))
time.sleep(1)
return self._curl_bitmex(api, query, postdict, timeout, verb)
return retry()

# Reset retry counter on success
self.retries = 0

return response.json()

0 comments on commit db031f4

Please sign in to comment.