Skip to content

Commit

Permalink
Merge pull request #38 from JavaSaBr/feature-broker-23
Browse files Browse the repository at this point in the history
Feature broker 23: Shared subscriptions
  • Loading branch information
JavaSaBr authored Jun 18, 2020
2 parents b2e5f2d + 4b0db33 commit 6d16b4e
Show file tree
Hide file tree
Showing 46 changed files with 856 additions and 302 deletions.
10 changes: 5 additions & 5 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ allprojects {
apply plugin: "groovy"
apply plugin: "org.springframework.boot"

sourceCompatibility = JavaVersion.VERSION_13
targetCompatibility = JavaVersion.VERSION_13
sourceCompatibility = JavaVersion.VERSION_14
targetCompatibility = JavaVersion.VERSION_14

repositories {
jcenter()
Expand All @@ -38,8 +38,8 @@ allprojects {
springVersion = '5.1.6.RELEASE'
junitJupiterVersion = "5.5.2"
testcontainersVersion = "1.12.1"
groovyVersion = "2.5.8"
spockVersion = "1.2-groovy-2.5"
groovyVersion = "3.0.4"
spockVersion = "2.0-M3-groovy-3.0"
projectReactorVersion = "3.3.0.RELEASE"
byteBuddyVersion = "1.10.2"
objenesisVersion = "3.1"
Expand Down Expand Up @@ -118,7 +118,7 @@ task buildSingleArtifactWithoutTests(type: GradleBuild) {
}

wrapper {
gradleVersion = '6.0'
gradleVersion = '6.6-milestone-1'
distributionType = Wrapper.DistributionType.ALL
}

Expand Down
Binary file modified gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.0-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-6.6-milestone-1-all.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 2 additions & 0 deletions gradlew
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ esac

CLASSPATH=$APP_HOME/gradle/wrapper/gradle-wrapper.jar


# Determine the Java command to use to start the JVM.
if [ -n "$JAVA_HOME" ] ; then
if [ -x "$JAVA_HOME/jre/sh/java" ] ; then
Expand Down Expand Up @@ -129,6 +130,7 @@ fi
if [ "$cygwin" = "true" -o "$msys" = "true" ] ; then
APP_HOME=`cygpath --path --mixed "$APP_HOME"`
CLASSPATH=`cygpath --path --mixed "$CLASSPATH"`

JAVACMD=`cygpath --unix "$JAVACMD"`

# We build the pattern for arguments to be converted via cygpath
Expand Down
4 changes: 4 additions & 0 deletions gradlew.bat
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ if "%DIRNAME%" == "" set DIRNAME=.
set APP_BASE_NAME=%~n0
set APP_HOME=%DIRNAME%

@rem Resolve any "." and ".." in APP_HOME to make it shorter.
for %%i in ("%APP_HOME%") do set APP_HOME=%%~fi

@rem Add default JVM options here. You can also use JAVA_OPTS and GRADLE_OPTS to pass JVM options to this script.
set DEFAULT_JVM_OPTS="-Xmx64m" "-Xms64m"

Expand Down Expand Up @@ -81,6 +84,7 @@ set CMD_LINE_ARGS=%*

set CLASSPATH=%APP_HOME%\gradle\wrapper\gradle-wrapper.jar


@rem Execute Gradle
"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -classpath "%CLASSPATH%" org.gradle.wrapper.GradleWrapperMain %CMD_LINE_ARGS%

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package com.ss.mqtt.broker.config;

import com.ss.mqtt.broker.handler.client.MqttClientReleaseHandler;
import com.ss.mqtt.broker.handler.packet.in.*;
import com.ss.mqtt.broker.handler.packet.in.PacketInHandler;
import com.ss.mqtt.broker.model.MqttPropertyConstants;
import com.ss.mqtt.broker.model.QoS;
import com.ss.mqtt.broker.network.MqttConnection;
import com.ss.mqtt.broker.network.client.ExternalMqttClient;
import com.ss.mqtt.broker.network.client.InternalMqttClient;
import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
import com.ss.rlib.network.BufferAllocator;
import com.ss.rlib.network.Network;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ public abstract class AbstractPacketHandler<C extends UnsafeMqttClient, R extend

@Override
public void handle(@NotNull UnsafeMqttClient client, @NotNull MqttReadablePacket packet) {
//noinspection unchecked
handleImpl((C) client, (R) packet);
}

protected abstract void handleImpl(@NotNull C client, @NotNull R packet);
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,6 @@ protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull DisconnectI
log.error("Disconnect client {} by error reason {}", client, reasonCode);
}

client.release().subscribe();
client.getConnection().close();
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,37 @@
package com.ss.mqtt.broker.handler.packet.in;

import static com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode.SHARED_SUBSCRIPTIONS_NOT_SUPPORTED;
import static com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode.WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED;
import static java.lang.Byte.toUnsignedInt;
import com.ss.mqtt.broker.model.reason.code.DisconnectReasonCode;
import com.ss.mqtt.broker.model.reason.code.SubscribeAckReasonCode;
import com.ss.mqtt.broker.network.client.MqttClient.UnsafeMqttClient;
import com.ss.mqtt.broker.network.packet.in.SubscribeInPacket;
import com.ss.mqtt.broker.service.SubscriptionService;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;

import java.util.Set;

@RequiredArgsConstructor
public class SubscribeInPacketHandler extends AbstractPacketHandler<UnsafeMqttClient, SubscribeInPacket> {

private final static Set<SubscribeAckReasonCode> INVALID_ACK_CODE = Set.of(
SHARED_SUBSCRIPTIONS_NOT_SUPPORTED,
WILDCARD_SUBSCRIPTIONS_NOT_SUPPORTED
);

private final @NotNull SubscriptionService subscriptionService;

@Override
protected void handleImpl(@NotNull UnsafeMqttClient client, @NotNull SubscribeInPacket packet) {
var ackReasonCodes = subscriptionService.subscribe(client, packet.getTopicFilters());
client.send(client.getPacketOutFactory().newSubscribeAck(packet.getPacketId(), ackReasonCodes));
var reason = ackReasonCodes.findAny(INVALID_ACK_CODE::contains);
if (reason != null) {
var disconnectReasonCode = DisconnectReasonCode.of(toUnsignedInt(reason.getValue()));
var disconnect = client.getPacketOutFactory().newDisconnect(client, disconnectReasonCode);
client.sendWithFeedback(disconnect).thenAccept(result -> client.getConnection().close());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import com.ss.mqtt.broker.handler.publish.out.PublishOutHandler;
import com.ss.mqtt.broker.model.ActionResult;
import com.ss.mqtt.broker.model.QoS;
import com.ss.mqtt.broker.model.Subscriber;
import com.ss.mqtt.broker.model.SingleSubscriber;
import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
import com.ss.mqtt.broker.service.SubscriptionService;
Expand All @@ -25,7 +25,7 @@ public void handle(@NotNull MqttClient client, @NotNull PublishInPacket packet)
}

private @NotNull ActionResult sendToSubscriber(
@NotNull Subscriber subscriber,
@NotNull SingleSubscriber subscriber,
@NotNull PublishInPacket packet
) {
return publishOutHandler(subscriber.getQos()).handle(packet, subscriber);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
abstract class AbstractPublishOutHandler implements PublishOutHandler {

@Override
public @NotNull ActionResult handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber) {
public @NotNull ActionResult handle(@NotNull PublishInPacket packet, @NotNull SingleSubscriber subscriber) {

var client = subscriber.getMqttClient();
var session = client.getSession();
Expand All @@ -30,7 +30,7 @@ abstract class AbstractPublishOutHandler implements PublishOutHandler {

protected abstract @NotNull QoS getQoS();

protected void sendPublish(
void sendPublish(
@NotNull MqttClient client,
@NotNull PublishInPacket packet,
int packetId,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.ss.mqtt.broker.handler.publish.out;

import com.ss.mqtt.broker.model.ActionResult;
import com.ss.mqtt.broker.model.SingleSubscriber;
import com.ss.mqtt.broker.model.Subscriber;
import com.ss.mqtt.broker.network.packet.in.PublishInPacket;
import org.jetbrains.annotations.NotNull;
Expand All @@ -10,5 +11,5 @@
*/
public interface PublishOutHandler {

@NotNull ActionResult handle(@NotNull PublishInPacket packet, @NotNull Subscriber subscriber);
@NotNull ActionResult handle(@NotNull PublishInPacket packet, @NotNull SingleSubscriber subscriber);
}
6 changes: 3 additions & 3 deletions src/main/java/com/ss/mqtt/broker/model/MqttVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ public enum MqttVersion {
return availableVersions[level];
}

private final String name;
private final byte[] nameInBytes;
private final @NotNull MqttPacketOutFactory packetOutFactory;
private final byte @NotNull [] nameInBytes;
private final @NotNull String name;
private final byte version;
private final MqttPacketOutFactory packetOutFactory;

MqttVersion(@NotNull String name, int version, @NotNull MqttPacketOutFactory packetOutFactory) {
this.name = name;
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/ss/mqtt/broker/model/PacketProperty.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ public enum PacketProperty {
}

private final @Getter byte id;
private final @Getter PacketDataType dataType;
private final Object defaultValue;
private final @Getter @NotNull PacketDataType dataType;
private final @Nullable Object defaultValue;

PacketProperty(int id, @NotNull PacketDataType dataType) {
this(id, dataType, null);
Expand Down
50 changes: 50 additions & 0 deletions src/main/java/com/ss/mqtt/broker/model/SharedSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package com.ss.mqtt.broker.model;

import com.ss.mqtt.broker.model.topic.SharedTopicFilter;
import com.ss.mqtt.broker.network.client.MqttClient;
import com.ss.rlib.common.util.array.Array;
import com.ss.rlib.common.util.array.ConcurrentArray;
import org.jetbrains.annotations.NotNull;

import java.util.Collection;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger;

public class SharedSubscriber implements Subscriber {

private static @NotNull SingleSubscriber next(@NotNull Array<SingleSubscriber> subscribers, int current) {
return subscribers.get(current % subscribers.size());
}

private final @NotNull SharedTopicFilter topicFilter;
private final @NotNull ConcurrentArray<SingleSubscriber> subscribers;
private final @NotNull AtomicInteger current;

public SharedSubscriber(@NotNull SubscribeTopicFilter topic) {
subscribers = ConcurrentArray.ofType(Subscriber.class);
current = new AtomicInteger(0);
topicFilter = (SharedTopicFilter) topic.getTopicFilter();
}

public @NotNull SingleSubscriber getSubscriber() {
//noinspection ConstantConditions
return subscribers.getInReadLock(current.getAndIncrement(), SharedSubscriber::next);
}

public void addSubscriber(@NotNull SingleSubscriber client) {
subscribers.runInWriteLock(client, Array::add);
}

public boolean removeSubscriber(@NotNull MqttClient client) {
return subscribers.removeIfConvertedInWriteLock(client, SingleSubscriber::getMqttClient, Objects::equals);
}

public int size() {
//noinspection ConstantConditions
return subscribers.getInReadLock(Collection::size);
}

public @NotNull String getGroup() {
return topicFilter.getGroup();
}
}
21 changes: 21 additions & 0 deletions src/main/java/com/ss/mqtt/broker/model/SingleSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.ss.mqtt.broker.model;

import com.ss.mqtt.broker.network.client.MqttClient;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.jetbrains.annotations.NotNull;

@ToString
@EqualsAndHashCode(of = "mqttClient")
@RequiredArgsConstructor
public class SingleSubscriber implements Subscriber {

private final @Getter @NotNull MqttClient mqttClient;
private final @NotNull SubscribeTopicFilter subscribe;

public @NotNull QoS getQos() {
return subscribe.getQos();
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package com.ss.mqtt.broker.model;

import lombok.EqualsAndHashCode;
import static com.ss.mqtt.broker.util.TopicUtils.buildTopicFilter;
import com.ss.mqtt.broker.model.topic.TopicFilter;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import org.jetbrains.annotations.NotNull;
Expand Down Expand Up @@ -43,7 +44,7 @@ public class SubscribeTopicFilter {
private final boolean retainAsPublished;

public SubscribeTopicFilter(@NotNull String topicFilter, @NotNull QoS qos) {
this(TopicFilter.from(topicFilter), qos, SubscribeRetainHandling.SEND, true, true);
this(buildTopicFilter(topicFilter), qos, SubscribeRetainHandling.SEND, true, true);
}

public SubscribeTopicFilter(@NotNull TopicFilter topicFilter, @NotNull QoS qos) {
Expand Down
35 changes: 1 addition & 34 deletions src/main/java/com/ss/mqtt/broker/model/Subscriber.java
Original file line number Diff line number Diff line change
@@ -1,36 +1,3 @@
package com.ss.mqtt.broker.model;

import com.ss.mqtt.broker.model.topic.TopicFilter;
import com.ss.mqtt.broker.network.client.MqttClient;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.jetbrains.annotations.NotNull;

@ToString
@EqualsAndHashCode(of = "mqttClient")
public class Subscriber {

private final @Getter @NotNull MqttClient mqttClient;
private final @NotNull SubscribeTopicFilter subscribeTopicFilter;

/**
* Creates subscriber
*
* @param mqttClient MQTT client which will become a subscriber
* @param topicFilter topic filter that MQTT client subscribes to
*/
public Subscriber(@NotNull MqttClient mqttClient, @NotNull SubscribeTopicFilter topicFilter) {
this.mqttClient = mqttClient;
this.subscribeTopicFilter = topicFilter;
}

public @NotNull QoS getQos() {
return subscribeTopicFilter.getQos();
}

public @NotNull TopicFilter getTopicFilter() {
return subscribeTopicFilter.getTopicFilter();
}

}
public interface Subscriber {}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
import org.jetbrains.annotations.NotNull;

@Getter
@ToString
@EqualsAndHashCode
@RequiredArgsConstructor
public class StringPair {

private final String name;
private final String value;
private final @NotNull String name;
private final @NotNull String value;
}
Loading

0 comments on commit 6d16b4e

Please sign in to comment.