Skip to content

Commit

Permalink
fix ORMap duplicated delta propagation (akka#22606)
Browse files Browse the repository at this point in the history
  • Loading branch information
gosubpl committed Mar 23, 2017
1 parent 2d00655 commit 2caef78
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 14 deletions.
32 changes: 18 additions & 14 deletions akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMap.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,10 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
"value, because important history can be lost when replacing the `ORSet` and " +
"undesired effects of merging will occur. Use `ORMultiMap` or `ORMap.updated` instead.")
else {
val putDeltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key value, zeroTag)
// put forcibly damages history, so we propagate full value that will overwrite previous values
new ORMap(keys.add(node, key), values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp)))
val newKeys = keys.resetDelta.add(node, key)
val putDeltaOp = PutDeltaOp(newKeys.delta.get, key value, zeroTag)
// put forcibly damages history, so we consciously propagate full value that will overwrite previous value
new ORMap(newKeys, values.updated(key, value), zeroTag, Some(newDelta(putDeltaOp)))
}

/**
Expand Down Expand Up @@ -263,20 +264,21 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
// we can emit (and later merge) their deltas instead of full updates.
// However to avoid necessity of tombstones, the derived map type needs to support this
// with clearing the value (e.g. removing all elements if value is a set)
// before removing the key - like e.g. ORMultiMap does
// before removing the key - like e.g. ORMultiMap.emptyWithValueDeltas does
val newKeys = keys.resetDelta.add(node, key)
oldValue match {
case _: DeltaReplicatedData if valueDeltas
val newValue = modify(oldValue.asInstanceOf[DeltaReplicatedData].resetDelta.asInstanceOf[B])
val newValueDelta = newValue.asInstanceOf[DeltaReplicatedData].delta
val deltaOp = newValueDelta match {
case Some(d) if hasOldValue UpdateDeltaOp(keys.resetDelta.add(node, key).delta.get, Map(key d), zeroTag)
case _ PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key newValue, zeroTag)
case Some(d) if hasOldValue UpdateDeltaOp(newKeys.delta.get, Map(key d), zeroTag)
case _ PutDeltaOp(newKeys.delta.get, key newValue, zeroTag)
}
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
new ORMap(newKeys, values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
case _
val newValue = modify(oldValue)
val deltaOp = PutDeltaOp(keys.resetDelta.add(node, key).delta.get, key newValue, zeroTag)
new ORMap(keys.add(node, key), values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
val deltaOp = PutDeltaOp(newKeys.delta.get, key newValue, zeroTag)
new ORMap(newKeys, values.updated(key, newValue), zeroTag, Some(newDelta(deltaOp)))
}
}

Expand All @@ -299,8 +301,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
*/
@InternalApi private[akka] def remove(node: UniqueAddress, key: A): ORMap[A, B] = {
// for removals the delta values map emitted will be empty
val removeDeltaOp = RemoveDeltaOp(keys.resetDelta.remove(node, key).delta.get, zeroTag)
new ORMap(keys.remove(node, key), values - key, zeroTag, Some(newDelta(removeDeltaOp)))
val newKeys = keys.resetDelta.remove(node, key)
val removeDeltaOp = RemoveDeltaOp(newKeys.delta.get, zeroTag)
new ORMap(newKeys, values - key, zeroTag, Some(newDelta(removeDeltaOp)))
}

/**
Expand All @@ -309,8 +312,9 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (
* by keeping the vvector (in form of key -> value pair) for deleted keys
*/
@InternalApi private[akka] def removeKey(node: UniqueAddress, key: A): ORMap[A, B] = {
val removeKeyDeltaOp = RemoveKeyDeltaOp(keys.resetDelta.remove(node, key).delta.get, key, zeroTag)
new ORMap(keys.remove(node, key), values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
val newKeys = keys.resetDelta.remove(node, key)
val removeKeyDeltaOp = RemoveKeyDeltaOp(newKeys.delta.get, key, zeroTag)
new ORMap(newKeys, values, zeroTag, Some(newDelta(removeKeyDeltaOp)))
}

private def dryMerge(that: ORMap[A, B], mergedKeys: ORSet[A], valueKeysIterator: Iterator[A]): ORMap[A, B] = {
Expand Down Expand Up @@ -353,7 +357,7 @@ final class ORMap[A, B <: ReplicatedData] private[akka] (

override def resetDelta: ORMap[A, B] =
if (delta.isEmpty) this
else new ORMap[A, B](keys, values, zeroTag = zeroTag)
else new ORMap[A, B](keys.resetDelta, values, zeroTag = zeroTag)

override def mergeDelta(thatDelta: ORMap.DeltaOp): ORMap[A, B] = {
// helper function to simplify folds below
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.cluster.ddata

import akka.actor.Address
import akka.cluster.UniqueAddress
import akka.cluster.ddata.ORSet.AddDeltaOp
import akka.cluster.ddata.Replicator.Changed
import org.scalatest.Matchers
import org.scalatest.WordSpec
Expand Down Expand Up @@ -141,6 +142,30 @@ class ORMapSpec extends WordSpec with Matchers {
merged2.entries("c").elements should be(Set("C"))
}

"do not have divergence in dot versions between the underlying map and ormap delta" in {
val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A")

val deltaVersion = m1.delta.get match {
case ORMap.PutDeltaOp(delta, v, dt)
delta match {
case AddDeltaOp(u)
if (u.elementsMap.contains("a"))
Some(u.elementsMap("a").versionAt(node1))
else
None
case _ None
}
case _ None
}

val fullVersion =
if (m1.keys.elementsMap.contains("a"))
Some(m1.keys.elementsMap("a").versionAt(node1))
else
None
deltaVersion should ===(fullVersion)
}

"not have anomalies for remove+updated scenario and deltas" in {
val m1 = ORMap.empty.put(node1, "a", GSet.empty + "A").put(node1, "b", GSet.empty + "B")
val m2 = ORMap.empty.put(node2, "c", GSet.empty + "C")
Expand Down

0 comments on commit 2caef78

Please sign in to comment.