Skip to content

Commit

Permalink
Merge pull request knowm#4243 from makarid/ftx-resubscribe-after-erro…
Browse files Browse the repository at this point in the history
…r-orderbook

[FTX] Resubscribe streaming orderbook if error occurs.
  • Loading branch information
earce authored Sep 7, 2021
2 parents 562e24f + 5b0efaa commit a9816b0
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ public class FtxStreamingAdapters {

private static final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();
/** Incoming values always has 1 trailing 0 after the decimal, and start with 1 zero */
private static final ThreadLocal<DecimalFormat> dfp = ThreadLocal.withInitial(() -> new DecimalFormat("0.0#######"));
private static final ThreadLocal<DecimalFormat> dfs = ThreadLocal.withInitial(() -> new DecimalFormat("0.####E00"));
private static final ThreadLocal<DecimalFormat> dfq = ThreadLocal.withInitial(() -> new DecimalFormat("0.0#######"));

static Ticker NULL_TICKER = new Ticker.Builder().build(); // not need to create a new one each time
private static final ThreadLocal<DecimalFormat> dfp =
ThreadLocal.withInitial(() -> new DecimalFormat("0.0#######"));

private static final ThreadLocal<DecimalFormat> dfs =
ThreadLocal.withInitial(() -> new DecimalFormat("0.####E00"));
private static final ThreadLocal<DecimalFormat> dfq =
ThreadLocal.withInitial(() -> new DecimalFormat("0.0#######"));

static Ticker NULL_TICKER =
new Ticker.Builder().build(); // not need to create a new one each time

public static OrderBook adaptOrderbookMessage(
OrderBook orderBook, Instrument instrument, JsonNode jsonNode) {
Expand All @@ -51,7 +56,7 @@ public static OrderBook adaptOrderbookMessage(
try {
return mapper.readValue(res.toString(), FtxOrderbookResponse.class);
} catch (IOException e) {
throw new RuntimeException(e);
throw new IllegalStateException(e);
}
})
.forEach(
Expand Down Expand Up @@ -106,7 +111,7 @@ public static OrderBook adaptOrderbookMessage(
getOrderbookChecksum(orderBook.getAsks(), orderBook.getBids());

if (!calculatedChecksum.equals(message.getChecksum())) {
throw new RuntimeException("Checksum is not correct!");
throw new IllegalStateException("Checksum is not correct!");
}
}
});
Expand All @@ -123,7 +128,7 @@ public static Long getOrderbookChecksum(List<LimitOrder> asks, List<LimitOrder>
DecimalFormat fp = dfp.get();
DecimalFormat fs = dfs.get();
DecimalFormat fq = dfq.get();

for (int i = 0; i < 100; i++) {
if (bids.size() > i) {
BigDecimal limitPrice = bids.get(i).getLimitPrice();
Expand All @@ -145,12 +150,12 @@ public static Long getOrderbookChecksum(List<LimitOrder> asks, List<LimitOrder>
.append(":");
}
}
String s = data.toString().replace("E","e"); // strip last :

String s = data.toString().replace("E", "e"); // strip last :

CRC32 crc32 = new CRC32();
byte[] toBytes = s.getBytes(StandardCharsets.UTF_8);
crc32.update(toBytes, 0, toBytes.length-1);
crc32.update(toBytes, 0, toBytes.length - 1);

return crc32.getValue();
}
Expand Down Expand Up @@ -201,28 +206,26 @@ public static UserTrade adaptUserTrade(JsonNode jsonNode) {
JsonNode data = jsonNode.get("data");

return new UserTrade.Builder()
.currencyPair(new CurrencyPair(data.get("market").asText()))
.type(
"buy".equals(data.get("side").asText())
? Order.OrderType.BID
: Order.OrderType.ASK)
.instrument(new CurrencyPair(data.get("market").asText()))
.originalAmount(data.get("size").decimalValue())
.price(data.get("price").decimalValue())
.timestamp(Date.from(Instant.ofEpochMilli(data.get("time").asLong())))
.id(data.get("id").asText())
.orderId(data.get("orderId").asText())
.feeAmount(data.get("fee").decimalValue())
.feeCurrency(new Currency(data.get("feeCurrency").asText()))
.build();
.currencyPair(new CurrencyPair(data.get("market").asText()))
.type("buy".equals(data.get("side").asText()) ? Order.OrderType.BID : Order.OrderType.ASK)
.instrument(new CurrencyPair(data.get("market").asText()))
.originalAmount(data.get("size").decimalValue())
.price(data.get("price").decimalValue())
.timestamp(Date.from(Instant.ofEpochMilli(data.get("time").asLong())))
.id(data.get("id").asText())
.orderId(data.get("orderId").asText())
.feeAmount(data.get("fee").decimalValue())
.feeCurrency(new Currency(data.get("feeCurrency").asText()))
.build();
}

public static Order adaptOrders(JsonNode jsonNode) {
JsonNode data = jsonNode.get("data");
System.out.println(jsonNode.toPrettyString());
LimitOrder.Builder order = new LimitOrder.Builder("buy".equals(data.get("side").asText())
? Order.OrderType.BID
: Order.OrderType.ASK, new CurrencyPair(data.get("market").asText()))
LimitOrder.Builder order =
new LimitOrder.Builder(
"buy".equals(data.get("side").asText()) ? Order.OrderType.BID : Order.OrderType.ASK,
new CurrencyPair(data.get("market").asText()))
.id(data.get("id").asText())
.timestamp(Date.from(Instant.now()))
.limitPrice(data.get("price").decimalValue())
Expand All @@ -237,6 +240,5 @@ public static Order adaptOrders(JsonNode jsonNode) {
if (data.get("reduceOnly").asBoolean()) order.flag(FtxOrderFlags.REDUCE_ONLY);

return order.build();

}
}
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package info.bitrich.xchangestream.ftx;

import com.google.common.collect.Lists;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Observable;
import java.util.ArrayList;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
Expand All @@ -23,19 +23,37 @@ public FtxStreamingMarketDataService(FtxStreamingService service) {

@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
OrderBook orderBook = new OrderBook(null, new ArrayList<>(), new ArrayList<>());
OrderBook orderBook = new OrderBook(null, Lists.newArrayList(), Lists.newArrayList());
String channelName = "orderbook:" + FtxAdapters.adaptCurrencyPairToFtxMarket(currencyPair);

return service
.subscribeChannel("orderbook:" + FtxAdapters.adaptCurrencyPairToFtxMarket(currencyPair))
.map(res -> FtxStreamingAdapters.adaptOrderbookMessage(orderBook, currencyPair, res));
.subscribeChannel(channelName)
.map(
res -> {
try {
return FtxStreamingAdapters.adaptOrderbookMessage(orderBook, currencyPair, res);
} catch (IllegalStateException e) {
LOG.warn(
"Resubscribing {} channel after adapter error {}",
currencyPair,
e.getMessage());
orderBook.getBids().clear();
orderBook.getAsks().clear();
// Resubscribe to the channel
this.service.sendMessage(service.getUnsubscribeMessage(channelName, args));
this.service.sendMessage(service.getSubscribeMessage(channelName, args));
return new OrderBook(null, Lists.newArrayList(), Lists.newArrayList(), false);
}
})
.filter(ob -> ob.getBids().size() > 0 && ob.getAsks().size() > 0);
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
return service
.subscribeChannel("ticker:" + FtxAdapters.adaptCurrencyPairToFtxMarket(currencyPair))
.map(res -> FtxStreamingAdapters.adaptTickerMessage(currencyPair, res))
.filter(ticker -> ticker != FtxStreamingAdapters.NULL_TICKER); // lets not send these backs
.filter(ticker -> ticker != FtxStreamingAdapters.NULL_TICKER); // lets not send these backs
}

@Override
Expand Down

0 comments on commit a9816b0

Please sign in to comment.