Skip to content

Commit

Permalink
Gate.io V4 Websocket Implementation (Trades + Orderbook)
Browse files Browse the repository at this point in the history
  • Loading branch information
gmax0 committed Jun 1, 2021
1 parent e79e1b7 commit 44f8bf6
Show file tree
Hide file tree
Showing 14 changed files with 726 additions and 0 deletions.
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,7 @@
<module>xchange-stream-core</module>
<module>xchange-stream-dydx</module>
<module>xchange-stream-ftx</module>
<module>xchange-stream-gateio</module>
<module>xchange-stream-gemini</module>
<module>xchange-stream-gemini-v2</module>
<module>xchange-stream-hitbtc</module>
Expand Down
32 changes: 32 additions & 0 deletions xchange-stream-gateio/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>xchange-parent</artifactId>
<groupId>org.knowm.xchange</groupId>
<version>5.0.9-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<name>XChange GateIO Stream</name>
<artifactId>xchange-stream-gateio</artifactId>

<dependencies>
<dependency>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-stream-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.knowm.xchange</groupId>
<artifactId>xchange-gateio</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${version.lombok}</version>
</dependency>
</dependencies>
</project>
54 changes: 54 additions & 0 deletions xchange-stream-gateio/src/main/java/GateioStreamingExchange.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.core.StreamingExchange;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Completable;
import org.knowm.xchange.gateio.GateioExchange;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Author: Max Gao ([email protected]) Created: 05-05-2021 */
public class GateioStreamingExchange extends GateioExchange implements StreamingExchange {
private static final Logger LOG = LoggerFactory.getLogger(GateioStreamingExchange.class);

private final String V4_URL = "wss://api.gateio.ws/ws/v4/";

private GateioStreamingService streamingService;
private StreamingMarketDataService streamingMarketDataService;

public GateioStreamingExchange() {}

@Override
public Completable connect(ProductSubscription... args) {
if (args == null || args.length == 0)
throw new UnsupportedOperationException("The ProductSubscription must be defined!");

this.streamingService = new GateioStreamingService(V4_URL);
this.streamingMarketDataService = new GateioStreamingMarketDataService(streamingService);

streamingService.subscribeMultipleCurrencyPairs(args);
return streamingService.connect();
}

@Override
public Completable disconnect() {
GateioStreamingService service = streamingService;
streamingService = null;
streamingMarketDataService = null;
return service.disconnect();
}

@Override
public StreamingMarketDataService getStreamingMarketDataService() {
return streamingMarketDataService;
}

@Override
public boolean isAlive() {
return streamingService != null && streamingService.isSocketOpen();
}

@Override
public void useCompressedMessages(boolean compressedMessages) {
streamingService.useCompressedMessages(compressedMessages);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import dto.response.GateioOrderBookResponse;
import dto.response.GateioTradesResponse;
import info.bitrich.xchangestream.core.StreamingMarketDataService;
import io.reactivex.Observable;
import org.knowm.xchange.currency.CurrencyPair;
import org.knowm.xchange.dto.marketdata.OrderBook;
import org.knowm.xchange.dto.marketdata.Ticker;
import org.knowm.xchange.dto.marketdata.Trade;
import org.knowm.xchange.exceptions.NotYetImplementedForExchangeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/** Author: Max Gao ([email protected]) Created: 05-05-2021 */
public class GateioStreamingMarketDataService implements StreamingMarketDataService {
private static final Logger LOGGER =
LoggerFactory.getLogger(GateioStreamingMarketDataService.class);
private static final int MAX_DEPTH_DEFAULT = 5;
private static final int UPDATE_INTERVAL_DEFAULT = 100;

private final GateioStreamingService service;

public GateioStreamingMarketDataService(GateioStreamingService service) {
this.service = service;
}

private boolean containsPair(List<CurrencyPair> pairs, CurrencyPair pair) {
return pairs.stream().anyMatch(p -> p.equals(pair));
}

/**
* Uses the limited-level snapshot method:
* https://www.gate.io/docs/apiv4/ws/index.html#limited-level-full-order-book-snapshot
*
* @param currencyPair Currency pair of the order book
* @param args Optional maxDepth, Optional msgInterval
* @return
*/
@Override
public Observable<OrderBook> getOrderBook(CurrencyPair currencyPair, Object... args) {
if (!containsPair(service.getProduct().getOrderBook(), currencyPair))
throw new UnsupportedOperationException(
String.format("The currency pair %s is not subscribed for orderbook", currencyPair));

final int maxDepth =
(args.length > 0 && args[0] instanceof Number)
? ((Number) args[0]).intValue()
: MAX_DEPTH_DEFAULT;
final int msgInterval =
(args.length > 1 && args[1] instanceof Number)
? ((Number) args[1]).intValue()
: UPDATE_INTERVAL_DEFAULT;

return service
.getRawWebSocketTransactions(
currencyPair,
GateioStreamingService.SPOT_ORDERBOOK_CHANNEL,
new Integer(msgInterval),
new Integer(maxDepth))
.map(msg -> ((GateioOrderBookResponse) msg).toOrderBook(currencyPair));
}

@Override
public Observable<Ticker> getTicker(CurrencyPair currencyPair, Object... args) {
throw new NotYetImplementedForExchangeException("Not yet implemented!");
}

@Override
public Observable<Trade> getTrades(CurrencyPair currencyPair, Object... args) {
return service
.getRawWebSocketTransactions(currencyPair, GateioStreamingService.SPOT_TRADES_CHANNEL)
.map(msg -> ((GateioTradesResponse) msg).toTrade());
}
}
144 changes: 144 additions & 0 deletions xchange-stream-gateio/src/main/java/GateioStreamingService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import dto.GateioWebSocketSubscriptionMessage;
import dto.response.GateioOrderBookResponse;
import dto.response.GateioTradesResponse;
import dto.response.GateioWebSocketTransaction;
import info.bitrich.xchangestream.core.ProductSubscription;
import info.bitrich.xchangestream.service.netty.JsonNettyStreamingService;
import info.bitrich.xchangestream.service.netty.StreamingObjectMapperHelper;
import info.bitrich.xchangestream.service.netty.WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler;
import io.netty.handler.codec.http.websocketx.extensions.WebSocketClientExtensionHandler;
import io.reactivex.Observable;
import org.knowm.xchange.currency.CurrencyPair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/** Author: Max Gao ([email protected]) Created: 05-05-2021 */
public class GateioStreamingService extends JsonNettyStreamingService {
private static final Logger LOG = LoggerFactory.getLogger(GateioStreamingService.class);
private static final String SUBSCRIBE = "subscribe";
private static final String UNSUBSCRIBE = "unsubscribe";
private static final String CHANNEL_NAME_DELIMITER = "-";

public static final String SPOT_ORDERBOOK_CHANNEL = "spot.order_book";
public static final String SPOT_TRADES_CHANNEL = "spot.trades";
public static final String SPOT_TICKERS_CHANNEL = "spot.tickers";

private final String apiUri;
private ProductSubscription productSubscription;

private final Map<String, Observable<JsonNode>> subscriptions = new ConcurrentHashMap<>();

public GateioStreamingService(String apiUri) {
super(apiUri, Integer.MAX_VALUE);
this.apiUri = apiUri;
}

public Observable<GateioWebSocketTransaction> getRawWebSocketTransactions(
CurrencyPair currencyPair, String channelName, Object... args) {
final ObjectMapper mapper = StreamingObjectMapperHelper.getObjectMapper();

return subscribeChannel(channelName, currencyPair, args)
.map(
msg -> {
switch (channelName) {
case SPOT_ORDERBOOK_CHANNEL:
return mapper.readValue(msg.toString(), GateioOrderBookResponse.class);
case SPOT_TRADES_CHANNEL:
return mapper.readValue(msg.toString(), GateioTradesResponse.class);
}
return mapper.readValue(msg.toString(), GateioWebSocketTransaction.class);
})
.filter(t -> currencyPair.equals(t.getCurrencyPair()));
}

public void subscribeMultipleCurrencyPairs(ProductSubscription... products) {
this.productSubscription = products[0];
}

public ProductSubscription getProduct() {
return this.productSubscription;
}

@Override
protected String getChannelNameFromMessage(JsonNode message) {
String channel = message.path("channel") != null ? message.path("channel").asText() : "";
String currencyPairOrderBook =
message.path("result").path("s") != null ? message.path("result").path("s").asText() : "";
String currencyPairTradesTickers =
message.path("result").path("currency_pair") != null
? message.path("result").path("currency_pair").asText()
: "";

return new StringBuilder(channel)
.append(CHANNEL_NAME_DELIMITER)
.append(currencyPairOrderBook)
.append(currencyPairTradesTickers)
.toString();
}

@Override
public Observable<JsonNode> subscribeChannel(String channelName, Object... args) {
final CurrencyPair currencyPair =
(args.length > 0 && args[0] instanceof CurrencyPair) ? ((CurrencyPair) args[0]) : null;

String currencyPairChannelName =
String.format("%s-%s", channelName, currencyPair.toString().replace('/', '_'));

// Example channel name key: spot.order_book_update-ETH_USDT, spot.trades-BTC_USDT
if (!channels.containsKey(currencyPairChannelName)
&& !subscriptions.containsKey(currencyPairChannelName)) {
subscriptions.put(
currencyPairChannelName, super.subscribeChannel(currencyPairChannelName, args));
}

return subscriptions.get(currencyPairChannelName);
}

/**
* Returns a JSON String containing the subscription message.
*
* @param channelName
* @param args CurrencyPair to subscribe to, followed by the refresh interval (Integer), followed
* by the depth (Integer, only applicable for OrderBook channel subscriptions)
* @return
* @throws IOException
*/
@Override
public String getSubscribeMessage(String channelName, Object... args) throws IOException {
final CurrencyPair currencyPair =
(args.length > 0 && args[0] instanceof CurrencyPair) ? ((CurrencyPair) args[0]) : null;
final Object[] argz =
(args.length > 1 && args[1] instanceof Object[]) ? (Object[]) args[1] : null;
final Integer interval =
(argz != null && argz.length > 0 && argz[0] instanceof Integer)
? ((Integer) argz[0])
: null;
final Integer depth =
(argz != null && argz.length > 1 && argz[1] instanceof Integer)
? ((Integer) argz[1])
: null;

String baseChannelName = channelName.split(CHANNEL_NAME_DELIMITER)[0];

GateioWebSocketSubscriptionMessage subscribeMessage =
new GateioWebSocketSubscriptionMessage(baseChannelName, currencyPair, interval, depth);

return objectMapper.writeValueAsString(subscribeMessage);
}

@Override
protected WebSocketClientExtensionHandler getWebSocketClientExtensionHandler() {
return WebSocketClientCompressionAllowClientNoContextAndServerNoContextHandler.INSTANCE;
}

@Override
public String getUnsubscribeMessage(String channelName) throws IOException {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package dto;

import com.fasterxml.jackson.annotation.JsonProperty;
import org.knowm.xchange.currency.CurrencyPair;

import java.time.Instant;
import java.util.Arrays;
import java.util.Objects;
import java.util.stream.Collectors;

/** Author: Max Gao ([email protected]) Created: 05-05-2021 */
public class GateioWebSocketSubscriptionMessage {
private static final String SUBSCRIBE = "subscribe";

@JsonProperty("time")
private int time;

@JsonProperty("channel")
private String channel;

@JsonProperty("event")
private String event;

@JsonProperty("payload")
private String[] payload;

public GateioWebSocketSubscriptionMessage(
String channelName, CurrencyPair currencyPair, Integer interval) {}

public GateioWebSocketSubscriptionMessage(
String channelName, CurrencyPair currencyPair, Integer interval, Integer depth) {
this.time = (int) (Instant.now().getEpochSecond());
this.channel = channelName;
this.event = SUBSCRIBE;
this.payload =
Arrays.asList(
currencyPair.toString().replace('/', '_'),
depth != null ? Integer.toString(depth) : null,
interval != null ? interval + "ms" : null)
.stream()
.filter(Objects::nonNull)
.collect(Collectors.toList())
.toArray(new String[] {});
}
}
Loading

0 comments on commit 44f8bf6

Please sign in to comment.