Skip to content

Commit

Permalink
Remove unused methods
Browse files Browse the repository at this point in the history
Remove unused methods

Remove unused import

Oops
  • Loading branch information
gmax0 committed Mar 28, 2021
1 parent 6538cb8 commit b0ac8d5
Showing 1 changed file with 1 addition and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -335,11 +334,6 @@ public Completable disconnect() {

public abstract String getSubscribeMessage(String channelName, Object... args) throws IOException;

/** Override this for use by channels that require multiple subscription messages to be sent per product */
public List<String> getSubscribeMessages(String channelName, Object... args) throws IOException {
return null;
}

public abstract String getUnsubscribeMessage(String channelName) throws IOException;

public String getSubscriptionUniqueId(String channelName, Object... args) {
Expand Down Expand Up @@ -370,29 +364,7 @@ public void sendMessage(String message) {
}
}

public void sendMessages(List<String> messages) {
if (webSocketChannel == null || !webSocketChannel.isOpen()) {
LOG.warn("WebSocket is not open! Call connect first.");
return;
}

if (!webSocketChannel.isWritable()) {
LOG.warn("Cannot send data to WebSocket as it is not writable.");
return;
}

if (messages != null && !messages.isEmpty()) {
messages.stream().forEach(message -> {
LOG.debug("Sending message: {}", message);
webSocketChannel.writeAndFlush(new TextWebSocketFrame(message));
});
} else {
LOG.warn("No messages are to be sent.");
return;
}
}

public Observable<Throwable> subscribeReconnectFailure() {
public Observable<Throwable> subscribeReconnectFailure() {
return reconnFailEmitters.share();
}

Expand Down Expand Up @@ -447,45 +419,6 @@ public Observable<T> subscribeChannel(String channelName, Object... args) {
.share();
}

public Observable<T> subscribeChannelMultipleMessages(String channelName, Object... args) {
final String channelId = getSubscriptionUniqueId(channelName, args);
LOG.info("Subscribing to channel {}", channelId);

return Observable.<T>create(
e -> {
if (webSocketChannel == null || !webSocketChannel.isOpen()) {
e.onError(new NotConnectedException());
}
channels.computeIfAbsent(
channelId,
cid -> {
Subscription newSubscription = new Subscription(e, channelName, args);
try {
sendMessages(getSubscribeMessages(channelName, args));
} catch (
Exception
throwable) { // if getSubscribeMessage throws this, it is because it
// needs to report
e.onError(throwable); // a problem creating the message
}
return newSubscription;
});
})
.doOnDispose(
() -> {
if (channels.remove(channelId) != null) {
try {
sendMessage(getUnsubscribeMessage(channelId));
} catch (IOException e) {
LOG.debug("Failed to unsubscribe channel: {} {}", channelId, e.toString());
} catch (Exception e) {
LOG.warn("Failed to unsubscribe channel: {}", channelId, e);
}
}
})
.share();
}

public void resubscribeChannels() {
for (Entry<String, Subscription> entry : channels.entrySet()) {
try {
Expand Down

0 comments on commit b0ac8d5

Please sign in to comment.