Skip to content

Commit

Permalink
feat(exchange.close()): have exchange.close() [ci deploy]
Browse files Browse the repository at this point in the history
* fix ts

* python fix

* python fix on_error call

* more elegante python solution

* php fixes

* fast client lint

* error hieracrchy

* lint

* lint

* eslint fixes

* latest changes

* remove logic of adding error

* fix python lint

[ci deploy]

---------

Co-authored-by: carlosmiei <[email protected]>
  • Loading branch information
pcriadoperez and carlosmiei authored Jan 4, 2024
1 parent a7bbeef commit 225bc94
Show file tree
Hide file tree
Showing 15 changed files with 304 additions and 16 deletions.
7 changes: 7 additions & 0 deletions php/ExchangeClosedByUser.php

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions php/pro/Client.php

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion php/pro/ClientTrait.php

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

86 changes: 86 additions & 0 deletions php/pro/test/base/test_close.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
<?php
namespace ccxt\pro;
include_once (__DIR__.'/../../../../ccxt.php');


use ccxt\ExchangeClosedByUser;
use React\Async;
use React\Promise;



function watch_ticker_loop ($exchange) {
return Async\async(function () use ($exchange) {
while (true) {
$ticker = Async\await ($exchange->watch_ticker('BTC/USDT'));
}
}) ();
};


function watch_order_book_for_symbols_loop($exchange) {
return Async\async(function () use ($exchange) {
while (true) {
$trades = Async\await ( $exchange->watch_trades_for_symbols(['BTC/USDT', 'ETH/USDT', 'LTC/USDT']));
}
}) ();
};


function close_after ($exchange, $s) {
return Async\async(function () use ($exchange, $s) {
Async\delay ($s);
$exchange->close();
}) ();
};


$test_close = function () {
$exchangeClass = '\\ccxt\\pro\\binance';
$exchange = new $exchangeClass();
$exchange->verbose = false;

// --------------------------------------------

var_dump('Testing exchange.close(): No future awaiting, should close with no errors');
Async\await ($exchange->watch_ticker('BTC/USD'));
var_dump('ticker received');
$exchange->close();
var_dump('PASSED - exchange closed with no errors');

// --------------------------------------------

var_dump('Testing exchange.close(): Awaiting future should throw ClosedByUser');
try {
Async\await(Promise\all([
close_after($exchange, 5),
watch_ticker_loop($exchange)
]));
} catch(\Throwable $e) {
if ($e instanceof ExchangeClosedByUser) {
var_dump('PASSED - future rejected with ClosedByUser');
} else {
throw $e;
}
}

// --------------------------------------------

var_dump('Test exchange.close(): Call watch_multiple unhandled futures are canceled');
try {
Async\await(Promise\all([
close_after($exchange, 5),
watch_order_book_for_symbols_loop($exchange)
]));
} catch(\Throwable $e) {
if ($e instanceof ExchangeClosedByUser) {
var_dump('PASSED - future rejected with ClosedByUser');
} else {
throw $e;
}
}
exit(0);
};


\React\Async\coroutine($test_close);
2 changes: 1 addition & 1 deletion python/ccxt/async_support/base/exchange.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,7 +488,7 @@ def on_error(self, client, error):

def on_close(self, client, error):
if client.error:
# connection closed due to an error
# connection closed by the user or due to an error
pass
else:
# server disconnected a working connection
Expand Down
11 changes: 9 additions & 2 deletions python/ccxt/async_support/base/ws/aiohttp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from .functions import milliseconds, iso8601, is_json_encoded_object
from ccxt.async_support.base.ws.client import Client
from ccxt.async_support.base.ws.functions import gunzip, inflate
from ccxt import NetworkError, RequestTimeout
from ccxt import NetworkError, RequestTimeout, ExchangeClosedByUser


class AiohttpClient(Client):
Expand Down Expand Up @@ -92,7 +92,14 @@ async def close(self, code=1000):
if self.ping_looper:
self.ping_looper.cancel()
if self.receive_looper:
self.receive_looper.cancel()
self.receive_looper.cancel() # cancel all pending futures stored in self.futures
for key in self.futures:
future = self.futures[key]
if not future.done():
if future.is_race_future:
future.cancel() # this is an "internal" future so we want to cancel it silently
else:
future.reject(ExchangeClosedByUser('Connection closed by the user'))

async def ping_loop(self):
if self.verbose:
Expand Down
7 changes: 5 additions & 2 deletions python/ccxt/async_support/base/ws/fast_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import asyncio
import socket
import collections
from ccxt import NetworkError
from ccxt.async_support.base.ws.aiohttp_client import AiohttpClient


Expand Down Expand Up @@ -36,7 +35,10 @@ def feed_data(message, size):
self.stack.append(message)

def feed_eof():
self.on_error(NetworkError(1006))
if self._close_code == 1000: # OK close
self.on_close(1000)
else:
self.on_error(1006) # ABNORMAL_CLOSURE

def wrapper(func):
def parse_frame(buf):
Expand All @@ -56,6 +58,7 @@ async def close(code=1000, message=b''):
try:
await _self._writer.close(code, message)
_self._response.close()
self._close_code = 1000
except asyncio.CancelledError:
_self._response.close()
_self._close_code = 1006
Expand Down
15 changes: 14 additions & 1 deletion python/ccxt/async_support/base/ws/future.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import asyncio
from ccxt import ExchangeClosedByUser


class Future(asyncio.Future):

is_race_future = False

def resolve(self, result=None):
if not self.done():
self.set_result(result)
Expand All @@ -14,6 +17,8 @@ def reject(self, error=None):
@classmethod
def race(cls, futures):
future = Future()
for f in futures:
f.is_race_future = True
coro = asyncio.wait(futures, return_when=asyncio.FIRST_COMPLETED)
task = asyncio.create_task(coro)

Expand All @@ -24,6 +29,8 @@ def callback(done):
# check for exceptions
exceptions = []
for f in complete:
if f._state == 'CANCELLED':
continue # was canceled internally
err = f.exception()
if err:
exceptions.append(err)
Expand All @@ -33,7 +40,13 @@ def callback(done):
return
# else return first result
else:
first_result = list(complete)[0].result()
futures_list = list(complete)
are_all_canceled = all(f._state == 'CANCELLED' for f in futures_list)
if are_all_canceled and future._state == 'PENDING':
future.set_exception(ExchangeClosedByUser('Connection closed by the user'))
return
first = futures_list[0]
first_result = first.result()
future.set_result(first_result)
else:
future.set_exception(exception)
Expand Down
11 changes: 8 additions & 3 deletions python/ccxt/base/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@
},
'InvalidNonce': {},
'RequestTimeout': {},
},
}
},
'ExchangeClosedByUser': {}
},
}

Expand Down Expand Up @@ -160,7 +161,6 @@ class ProxyError(ExchangeError):
class OperationFailed(BaseError):
pass


class NetworkError(OperationFailed):
pass

Expand Down Expand Up @@ -188,6 +188,10 @@ class InvalidNonce(NetworkError):
class RequestTimeout(NetworkError):
pass

class ExchangeClosedByUser(BaseError):
pass



__all__ = [
'error_hierarchy',
Expand Down Expand Up @@ -225,5 +229,6 @@ class RequestTimeout(NetworkError):
'ExchangeNotAvailable',
'OnMaintenance',
'InvalidNonce',
'RequestTimeout'
'RequestTimeout',
'ExchangeClosedByUser'
]
64 changes: 64 additions & 0 deletions python/ccxt/pro/test/base/test_close.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import os
import sys

root = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))))
sys.path.append(root)

from asyncio import run, sleep, gather
from ccxt.base.errors import ExchangeClosedByUser # noqa E402
import ccxt.pro

async def watch_ticker_loop(exchange):
while True:
ticker = await exchange.watch_ticker('BTC/USDT')
print('ticker received')


async def watch_order_book_for_symbols_loop(exchange):
while True:
trades = await exchange.watch_trades_for_symbols(['BTC/USDT', 'ETH/USDT', 'LTC/USDT'])
print('trades received')


async def close_after(exchange, ms):
await sleep(ms)
await exchange.close()


async def test_close():
exchange = ccxt.pro.binance()
# exchange.verbose = True
# --------------------------------------------
print('Testing exchange.close(): No future awaiting, should close with no errors')
await exchange.watch_ticker('BTC/USD')
print('ticker received')
await exchange.close()
print('PASSED - exchange closed with no errors')
# --------------------------------------------
print('Testing exchange.close(): Open watch multiple, resolve, should close with no errors')
await exchange.watch_trades_for_symbols(['BTC/USD', 'ETH/USD', 'LTC/USD'])
print('ticker received')
await exchange.close()
print('PASSED - exchange closed with no errors')
# --------------------------------------------
print('Testing exchange.close(): Awaiting future should throw ClosedByUser')
try:
await gather(close_after(exchange, 5), watch_ticker_loop(exchange))
except Exception as e:
if isinstance(e, ExchangeClosedByUser):
print('PASSED - future rejected with ClosedByUser')
else:
raise e
# --------------------------------------------
print('Test exchange.close(): Call watch_multiple unhandled futures are canceled')
try:
await gather(close_after(exchange, 5), watch_order_book_for_symbols_loop(exchange))
except Exception as e:
if isinstance(e, ExchangeClosedByUser):
print('PASSED - future rejected with ClosedByUser')
else:
raise e
exit(0)


run(test_close())
12 changes: 9 additions & 3 deletions ts/src/base/Exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,8 @@ import {
, ExchangeNotAvailable
, ArgumentsRequired
, RateLimitExceeded,
BadRequest} from "./errors.js"
BadRequest,
ExchangeClosedByUser} from "./errors.js"

import { Precise } from './Precise.js'

Expand Down Expand Up @@ -1502,10 +1503,15 @@ export default class Exchange {
const closedClients = [];
for (let i = 0; i < clients.length; i++) {
const client = clients[i] as WsClient;
delete this.clients[client.url];
client.error = new ExchangeClosedByUser (this.id + ' closedByUser');
closedClients.push(client.close ());
}
return Promise.all (closedClients);
await Promise.all (closedClients);
for (let i = 0; i < clients.length; i++) {
const client = clients[i] as WsClient;
delete this.clients[client.url];
}
return;
}

async loadOrderBook (client, messageHash, symbol, limit = undefined, params = {}) {
Expand Down
1 change: 1 addition & 0 deletions ts/src/base/errorHierarchy.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ const errorHierarchy = {
'RequestTimeout': {},
},
},
'ExchangeClosedByUser': {},
},
};

Expand Down
Loading

0 comments on commit 225bc94

Please sign in to comment.