Skip to content

Commit

Permalink
GSet ported to delta-CRDT (akka#22187)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosubpl committed Feb 16, 2017
1 parent 5ffb08c commit d470321
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 20 deletions.
28 changes: 24 additions & 4 deletions akka-distributed-data/src/main/scala/akka/cluster/ddata/GSet.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@
package akka.cluster.ddata

object GSet {
private val _empty: GSet[Any] = new GSet(Set.empty)
private val _empty: GSet[Any] = new GSet(Set.empty)(None)
def empty[A]: GSet[A] = _empty.asInstanceOf[GSet[A]]
def apply(): GSet[Any] = _empty
private[akka] def apply[A](set: Set[A]): GSet[A] = new GSet(set)(None)

/**
* Java API
*/
Expand All @@ -27,7 +29,8 @@ object GSet {
* This class is immutable, i.e. "modifying" methods return a new instance.
*/
@SerialVersionUID(1L)
final case class GSet[A](elements: Set[A]) extends ReplicatedData with ReplicatedDataSerialization with FastMerge {
final case class GSet[A] private (elements: Set[A])(_delta: Option[GSet[A]])
extends DeltaReplicatedData with ReplicatedDataSerialization with FastMerge {

type T = GSet[A]

Expand All @@ -53,15 +56,32 @@ final case class GSet[A](elements: Set[A]) extends ReplicatedData with Replicate
/**
* Adds an element to the set
*/
def add(element: A): GSet[A] = assignAncestor(copy(elements + element))
def add(element: A): GSet[A] = {
val newDelta = _delta match {
case Some(e) Some(new GSet(e.elements + element)(None))
case None Some(new GSet[A](Set.apply[A](element))(None))
}
assignAncestor(new GSet[A](elements + element)(newDelta))
}

override def merge(that: GSet[A]): GSet[A] =
if ((this eq that) || that.isAncestorOf(this)) this.clearAncestor()
else if (this.isAncestorOf(that)) that.clearAncestor()
else {
clearAncestor()
copy(elements union that.elements)
new GSet[A](elements union that.elements)(None)
}

override def delta: GSet[A] = _delta match {
case Some(d) d
case None GSet.empty[A]
}

override def resetDelta: GSet[A] = new GSet[A](elements)(None)

override def toString: String = s"G$elements"

def copy(e: Set[A] = elements) = new GSet[A](e)(_delta)
}

object GSetKey {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,30 +39,62 @@ class GSetSpec extends WordSpec with Matchers {
val c12 = c11 + user1
val c13 = c12 + user2

c13.elements should contain(user1)
c13.elements should contain(user2)
c13.elements should ===(Set(user1, user2))

// set 2
val c21 = GSet.empty[String]

val c22 = c21 + user3
val c23 = c22 + user4

c23.elements should contain(user3)
c23.elements should contain(user4)
c23.elements should ===(Set(user3, user4))

// merge both ways
val merged1 = c13 merge c23
merged1.elements should contain(user1)
merged1.elements should contain(user2)
merged1.elements should contain(user3)
merged1.elements should contain(user4)
merged1.elements should ===(Set(user1, user2, user3, user4))

val merged2 = c23 merge c13
merged2.elements should contain(user1)
merged2.elements should contain(user2)
merged2.elements should contain(user3)
merged2.elements should contain(user4)
merged2.elements should ===(Set(user1, user2, user3, user4))
}

"be able to work with deltas" in {
// set 1
val c11 = GSet.empty[String]

val c12 = c11 + user1
val c13 = c12 + user2

c12.delta.elements should ===(Set(user1))
c13.delta.elements should ===(Set(user1, user2))

// deltas build state
(c12 merge c13.delta) should ===(c13)

// own deltas are idempotent
(c13 merge c13.delta) should ===(c13)

// set 2
val c21 = GSet.empty[String]

val c22 = c21 + user3
val c23 = c22.resetDelta + user4

c22.delta.elements should ===(Set(user3))
c23.delta.elements should ===(Set(user4))

c23.elements should ===(Set(user3, user4))

val c33 = c13 merge c23

// merge both ways
val merged1 = GSet.empty[String] merge c12.delta merge c13.delta merge c22.delta merge c23.delta
merged1.elements should ===(Set(user1, user2, user3, user4))

val merged2 = GSet.empty[String] merge c23.delta merge c13.delta merge c22.delta
merged2.elements should ===(Set(user1, user2, user3, user4))

merged1 should ===(c33)
merged2 should ===(c33)
}

"be able to have its user set correctly merged with another GSet with overlapping user sets" in {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,9 @@ protected TwoPhaseSet twoPhaseSetFromBinary(byte[] bytes) {
for (String elem : msg.getRemovalsList()) {
removals = removals.add(elem);
}
return new TwoPhaseSet(adds, removals);
// GSet will accumulate deltas when adding elements,
// but those are not of interest in the result of the deserialization
return new TwoPhaseSet(adds.resetDelta(), removals.resetDelta());
} catch (Exception e) {
throw new RuntimeException(e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,13 @@ class TwoPhaseSetSerializer(val system: ExtendedActorSystem)

def twoPhaseSetFromBinary(bytes: Array[Byte]): TwoPhaseSet = {
val msg = TwoPhaseSetMessages.TwoPhaseSet.parseFrom(bytes)
TwoPhaseSet(
adds = GSet(msg.getAddsList.iterator.asScala.toSet),
removals = GSet(msg.getRemovalsList.iterator.asScala.toSet))
val addsSet = msg.getAddsList.iterator.asScala.toSet
val removalsSet = msg.getRemovalsList.iterator.asScala.toSet
val adds = addsSet.foldLeft(GSet.empty[String])((acc, el) => acc.add(el))
val removals = removalsSet.foldLeft(GSet.empty[String])((acc, el) => acc.add(el))
// GSet will accumulate deltas when adding elements,
// but those are not of interest in the result of the deserialization
TwoPhaseSet(adds.resetDelta, removals.resetDelta)
}
}
//#serializer
Expand Down
2 changes: 2 additions & 0 deletions project/MiMa.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ object MiMa extends AutoPlugin {
import com.typesafe.tools.mima.core._

val bcIssuesBetween24and25 = Seq(
// ##22269 GSet as delta-CRDT
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GSet.this"), // constructor supplied by companion object

// #21875 delta-CRDT
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.cluster.ddata.GCounter.this"),
Expand Down

0 comments on commit d470321

Please sign in to comment.