Skip to content

Commit

Permalink
TimeTriggeredBatch: added methods to create and send time triggered b…
Browse files Browse the repository at this point in the history
…atch
  • Loading branch information
Hubert Łyczek authored and HubertBubert committed Dec 14, 2018
1 parent 58795b8 commit f9eab8c
Show file tree
Hide file tree
Showing 2 changed files with 389 additions and 27 deletions.
204 changes: 177 additions & 27 deletions quedex_api/user_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pgpy

from enum import Enum

class UserStreamListener(object):
def on_ready(self):
Expand Down Expand Up @@ -150,6 +151,43 @@ def on_order_filled(self, order_filled):
"""
pass

def on_time_triggered_batch_added(self, time_triggered_batch_added):
"""
:param time_triggered_batch_added: a dict of the following format:
{
"timer_id": "<string id>",
}
"""
pass

def on_time_triggered_batch_rejected(self, time_triggered_batch_rejected):
"""
:param time_triggered_batch_rejected: a dict of the following format:
{
"timer_id": "<string id>",
"cause": "too_many_active_timers"/"timer_already_expired"/"timer_already_exists",
}
"""
pass

def on_time_triggered_batch_expired(self, time_triggered_batch_expired):
"""
:param time_triggered_batch_expired: a dict of the following format:
{
"timer_id": "<string id>",
}
"""
pass

def on_time_triggered_batch_triggered(self, time_triggered_batch_triggered):
"""
:param time_triggered_batch_triggered: a dict of the following format:
{
"timer_id": "<string id>",
}
"""
pass

def on_error(self, error):
"""
Called when an error with market stream occurs (data parsing, signature verification, webosocket
Expand Down Expand Up @@ -185,6 +223,18 @@ class UserStream(object):
class - see their comments for the format of the data.
"""

class BatchMode(Enum):
STANDARD = 1
TIME_TRIGGERED_CREATE = 2
TIME_TRIGGERED_UPDATE = 3

type_listener = {
'timer_added': 'time_triggered_batch_added',
'timer_rejected': 'time_triggered_batch_rejected',
'timer_expired': 'time_triggered_batch_expired',
'timer_triggered': 'time_triggered_batch_triggered'
}

def __init__(self, exchange, trader, nonce_group=5):
"""
:param nonce_group: value between 0 and 9, has to be different for every WebSocket connection
Expand All @@ -202,8 +252,9 @@ def __init__(self, exchange, trader, nonce_group=5):
self._nonce_group = nonce_group
self._nonce = None
self._initialized = False
self._batching = False
self._batch = None
self._batch_mode = None
self._time_triggered_batch_command = None

def add_listener(self, listener):
self._listeners.append(listener)
Expand All @@ -230,7 +281,7 @@ def place_order(self, place_order_command):
place_order_command['type'] = 'place_order'
check_place_order(place_order_command)
self._set_nonce_account_id(place_order_command)
if self._batching:
if self._batch_mode:
self._batch.append(place_order_command)
else:
self._encrypt_send(place_order_command)
Expand All @@ -246,7 +297,7 @@ def cancel_order(self, cancel_order_command):
check_cancel_order(cancel_order_command)
cancel_order_command['type'] = 'cancel_order'
self._set_nonce_account_id(cancel_order_command)
if self._batching:
if self._batch_mode:
self._batch.append(cancel_order_command)
else:
self._encrypt_send(cancel_order_command)
Expand All @@ -255,7 +306,7 @@ def cancel_all_orders(self):
self._check_if_initialized()
cancel_all_orders_command = {'type': 'cancel_all_orders'}
self._set_nonce_account_id(cancel_all_orders_command)
if self._batching:
if self._batch_mode:
self._batch.append(cancel_all_orders_command)
else:
self._encrypt_send(cancel_all_orders_command)
Expand All @@ -281,7 +332,7 @@ def modify_order(self, modify_order_command):
check_modify_order(modify_order_command)
modify_order_command['type'] = 'modify_order'
self._set_nonce_account_id(modify_order_command)
if self._batching:
if self._batch_mode:
self._batch.append(modify_order_command)
else:
self._encrypt_send(modify_order_command)
Expand All @@ -308,51 +359,146 @@ def batch(self, order_commands):
...
]
"""
self._check_if_initialized()
if len(order_commands) == 0:
raise ValueError("Empty batch")
for command in order_commands:
type = command['type']
if type == 'place_order':
check_place_order(command)
elif type == 'cancel_order':
check_cancel_order(command)
elif type == 'modify_order':
check_modify_order(command)
elif type == 'cancel_all_orders':
check_cancel_all_orders(command)
else:
raise ValueError('Unsupported command type: ' + type)
self._set_nonce_account_id(command)
self._verify_batch_commands_and_set_nonces_and_account_id(order_commands)
self._send_batch_no_checks(order_commands)

def start_batch(self):
"""
After this method is called all calls to place_order, cancel_order, modify_order result in
caching of the commands which are then sent once send_batch is called.
"""
if self._batch_mode == self.BatchMode.TIME_TRIGGERED_CREATE:
raise Exception('Cannot start another batch. Currently creating time triggered batch')
if self._batch_mode == self.BatchMode.TIME_TRIGGERED_UPDATE:
raise Exception('Cannot start another batch. Currently updating time triggered batch')
self._batch = []
self._batching = True
self._batch_mode = self.BatchMode.STANDARD

def send_batch(self):
"""
Sends batch created from calling place_order, cancel_order, modify_order after calling
start_batch.
"""
if not self._batching:
if not (self._batch_mode == self.BatchMode.STANDARD):
raise Exception('send_batch called without calling start_batch first')
if len(self._batch) == 0:
raise ValueError("Empty batch")
self._send_batch_no_checks(self._batch)
self._batch = None
self._batching = False
self._batch_mode = None

def time_triggered_batch(self, timer_id, execution_start_timestamp, execution_expiration_timestamp, order_commands):
"""
Creates timer in exchange engine with a batch of order commands. These commands will be automatically
placed, between executionStartTimestamp and executionExpirationTimestamp.
:param timer_id: a user defined timer identifier, can be used to cancel or update batch
:param execution_start_timestamp: the defined batch will not be executed before this timestamp
:param execution_expiration_timestamp: the defined batch will not be executed after this timestamp
:param order_commands: a list with a number of commands where the following are possible:
[
{
"type": "place_order",
// for the rest of the fields see place_order method
},
{
"type": "cancel_order",
// for the rest of the fields see cancel_order method
},
{
"type": "modify_order",
// for the rest of the fields see modify_order method
},
{
"type": "cancel_all_orders",
},
...
]
"""
command = {
'type': 'add_timer',
'timer_id': timer_id,
'execution_start_timestamp': execution_start_timestamp,
'execution_expiration_timestamp': execution_expiration_timestamp
}
self._set_nonce_account_id(command)
self._verify_batch_commands_and_set_nonces_and_account_id(order_commands)

self._send_time_triggered_batch_no_checks(command, order_commands)

def start_time_triggered_batch(self, timer_id, execution_start_timestamp, execution_expiration_timestamp):
"""
After this method is called all calls to place_order, cancel_order, modify_order result in
caching of the commands which are then sent once send_time_triggered_batch is called.
:param timer_id: a user defined timer identifier, can be used to cancel or update batch
:param execution_start_timestamp: the defined batch will not be executed before this timestamp
:param execution_expiration_timestamp: the defined batch will not be executed after this timestamp
"""
if self._batch_mode == self.BatchMode.STANDARD:
raise Exception('Cannot start another batch. Currently creating batch')
if self._batch_mode == self.BatchMode.TIME_TRIGGERED_UPDATE:
raise Exception('Cannot start another batch. Currently updating time triggered batch')
self._batch = []
self._batch_mode = self.BatchMode.TIME_TRIGGERED_CREATE
command = {
'type': 'add_timer',
'timer_id': timer_id,
'execution_start_timestamp': execution_start_timestamp,
'execution_expiration_timestamp': execution_expiration_timestamp
}
self._set_nonce_account_id(command)
self._time_triggered_batch_command = command

def send_time_triggered_batch(self):
"""
Sends time triggered batch created from calling place_order, cancel_order, modify_order after calling
start_time_triggered_batch, which creates timer in exchange engine with a batch of order commands.
"""
if not (self._batch_mode == self.BatchMode.TIME_TRIGGERED_CREATE):
raise Exception('Send_batch called without calling start_time_triggered_batch first')
if len(self._batch) == 0:
raise ValueError("Empty batch")
self._send_time_triggered_batch_no_checks(self._time_triggered_batch_command, self._batch)
self._batch = None
self._batch_mode = None
self._time_triggered_batch_command = None

def _send_batch_no_checks(self, order_commands):
self._encrypt_send({
self._encrypt_send(
self._create_batch_command_no_checks(order_commands)
)

def _send_time_triggered_batch_no_checks(self, batch_command, order_commands):
batch_command['command'] = self._create_batch_command_no_checks(order_commands)
self._encrypt_send(
batch_command
)

def _create_batch_command_no_checks(self, order_commands):
return {
'type': 'batch',
'account_id': self._trader.account_id,
'batch': order_commands,
})
}

def _verify_batch_commands_and_set_nonces_and_account_id(self, order_commands):
self._check_if_initialized()
if len(order_commands) == 0:
raise ValueError("Empty batch")
for command in order_commands:
type = command['type']
if type == 'place_order':
check_place_order(command)
elif type == 'cancel_order':
check_cancel_order(command)
elif type == 'modify_order':
check_modify_order(command)
elif type == 'cancel_all_orders':
check_cancel_all_orders(command)
else:
raise ValueError('Unsupported command type: ' + type)
self._set_nonce_account_id(command)

def initialize(self):
self._encrypt_send({
Expand Down Expand Up @@ -397,14 +543,18 @@ def process_data(self, message_wrapper):
continue

self._call_listeners('on_message', entity)
self._call_listeners('on_' + entity['type'], entity)
self._call_listeners('on_' + self._to_listener_name(entity['type']), entity)

def on_error(self, error):
self._call_listeners('on_error', error)

def on_disconnect(self, message):
self._call_listeners('on_disconnect', message)

def _to_listener_name(self, entity_type):
mapped_listener_name = self.type_listener.get(entity_type)
return mapped_listener_name if mapped_listener_name else entity_type

def _set_nonce_account_id(self, entity):
self._nonce += 1
entity['nonce'] = self._nonce
Expand Down
Loading

0 comments on commit f9eab8c

Please sign in to comment.