Skip to content

Commit

Permalink
rememberingEntities with ddata mode, akka#22154
Browse files Browse the repository at this point in the history
* one Replicator per configured role
* log LMDB directory at startup
* clarify the imporantce of the LMDB directory
* use more than one key to support many entities
  • Loading branch information
patriknw committed Jan 23, 2017
1 parent 8fd5b7e commit 37679d3
Show file tree
Hide file tree
Showing 23 changed files with 699 additions and 323 deletions.
11 changes: 11 additions & 0 deletions akka-cluster-sharding/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,17 @@ akka.cluster.sharding {
# The "role" of the singleton configuration is not used. The singleton role will
# be the same as "akka.cluster.sharding.role".
coordinator-singleton = ${akka.cluster.singleton}

# Settings for the Distributed Data replicator. Used when state-store-mode=ddata.
# Same layout as akka.cluster.distributed-data.
# The "role" of the distributed-data configuration is not used. The distributed-data
# role will be the same as "akka.cluster.sharding.role".
# Note that there is one Replicator per role and it's not possible
# to have different distributed-data settings for different sharding entity types.
distributed-data = ${akka.cluster.distributed-data}
distributed-data {
durable.keys = ["shard-*"]
}

# The id of the dispatcher to use for ClusterSharding actors.
# If not specified default dispatcher is used.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import akka.pattern.BackoffSupervisor
import akka.util.ByteString
import akka.pattern.ask
import akka.dispatch.Dispatchers
import akka.cluster.ddata.ReplicatorSettings
import akka.cluster.ddata.Replicator
import scala.util.control.NonFatal
import akka.actor.Status

/**
* This extension provides sharding functionality of actors in a cluster.
Expand Down Expand Up @@ -413,73 +417,114 @@ private[akka] class ClusterShardingGuardian extends Actor {

val cluster = Cluster(context.system)
val sharding = ClusterSharding(context.system)
lazy val replicator = DistributedData(context.system).replicator

private lazy val replicatorSettings =
ReplicatorSettings(context.system.settings.config.getConfig(
"akka.cluster.sharding.distributed-data"))
private var replicatorByRole = Map.empty[Option[String], ActorRef]

private def coordinatorSingletonManagerName(encName: String): String =
encName + "Coordinator"

private def coordinatorPath(encName: String): String =
(self.path / coordinatorSingletonManagerName(encName) / "singleton" / "coordinator").toStringWithoutAddress

private def replicator(settings: ClusterShardingSettings): ActorRef = {
if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModeDData) {
// one Replicator per role
replicatorByRole.get(settings.role) match {
case Some(ref) ref
case None
val name = settings.role match {
case Some(r) URLEncoder.encode(r, ByteString.UTF_8) + "Replicator"
case None "replicator"
}
val ref = context.actorOf(Replicator.props(replicatorSettings.withRole(settings.role)), name)
replicatorByRole = replicatorByRole.updated(settings.role, ref)
ref
}
} else
context.system.deadLetters
}

def receive = {
case Start(typeName, entityProps, settings, extractEntityId, extractShardId, allocationStrategy, handOffStopMessage)
import settings.role
import settings.tuningParameters.coordinatorFailureBackoff

val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
if (context.child(cName).isEmpty) {
val coordinatorProps =
if (settings.stateStoreMode == "persistence")
ShardCoordinator.props(typeName, settings, allocationStrategy)
else
ShardCoordinator.props(typeName, settings, allocationStrategy, replicator)
val singletonProps = BackoffSupervisor.props(
childProps = coordinatorProps,
childName = "coordinator",
minBackoff = coordinatorFailureBackoff,
maxBackoff = coordinatorFailureBackoff * 5,
randomFactor = 0.2).withDeploy(Deploy.local)
val singletonSettings = settings.coordinatorSingletonSettings
.withSingletonName("singleton").withRole(role)
try {
import settings.role
import settings.tuningParameters.coordinatorFailureBackoff

val rep = replicator(settings)
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
if (context.child(cName).isEmpty) {
val coordinatorProps =
if (settings.stateStoreMode == ClusterShardingSettings.StateStoreModePersistence)
ShardCoordinator.props(typeName, settings, allocationStrategy)
else {
ShardCoordinator.props(typeName, settings, allocationStrategy, rep)
}
val singletonProps = BackoffSupervisor.props(
childProps = coordinatorProps,
childName = "coordinator",
minBackoff = coordinatorFailureBackoff,
maxBackoff = coordinatorFailureBackoff * 5,
randomFactor = 0.2).withDeploy(Deploy.local)
val singletonSettings = settings.coordinatorSingletonSettings
.withSingletonName("singleton").withRole(role)
context.actorOf(
ClusterSingletonManager.props(
singletonProps,
terminationMessage = PoisonPill,
singletonSettings).withDispatcher(context.props.dispatcher),
name = cName)
}

context.actorOf(
ClusterSingletonManager.props(
singletonProps,
terminationMessage = PoisonPill,
singletonSettings).withDispatcher(context.props.dispatcher),
name = cName)
ShardRegion.props(
typeName = typeName,
entityProps = entityProps,
settings = settings,
coordinatorPath = cPath,
extractEntityId = extractEntityId,
extractShardId = extractShardId,
handOffStopMessage = handOffStopMessage,
replicator = rep).withDispatcher(context.props.dispatcher),
name = encName)
}

context.actorOf(
ShardRegion.props(
typeName = typeName,
entityProps = entityProps,
settings = settings,
coordinatorPath = cPath,
extractEntityId = extractEntityId,
extractShardId = extractShardId,
handOffStopMessage = handOffStopMessage).withDispatcher(context.props.dispatcher),
name = encName)
sender() ! Started(shardRegion)
} catch {
case NonFatal(e)
// don't restart
// could be invalid ReplicatorSettings, or InvalidActorNameException
// if it has already been started
sender() ! Status.Failure(e)
}
sender() ! Started(shardRegion)

case StartProxy(typeName, settings, extractEntityId, extractShardId)
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
context.actorOf(
ShardRegion.proxyProps(
typeName = typeName,
settings = settings,
coordinatorPath = cPath,
extractEntityId = extractEntityId,
extractShardId = extractShardId).withDispatcher(context.props.dispatcher),
name = encName)
try {
val encName = URLEncoder.encode(typeName, ByteString.UTF_8)
val cName = coordinatorSingletonManagerName(encName)
val cPath = coordinatorPath(encName)
val shardRegion = context.child(encName).getOrElse {
context.actorOf(
ShardRegion.proxyProps(
typeName = typeName,
settings = settings,
coordinatorPath = cPath,
extractEntityId = extractEntityId,
extractShardId = extractShardId,
replicator = context.system.deadLetters).withDispatcher(context.props.dispatcher),
name = encName)
}
sender() ! Started(shardRegion)
} catch {
case NonFatal(e)
// don't restart
// could be InvalidActorNameException if it has already been started
sender() ! Status.Failure(e)
}
sender() ! Started(shardRegion)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ import com.typesafe.config.Config
import akka.cluster.singleton.ClusterSingletonManagerSettings

object ClusterShardingSettings {

val StateStoreModePersistence = "persistence"
val StateStoreModeDData = "ddata"

/**
* Create settings from the default configuration
* `akka.cluster.sharding`.
Expand Down Expand Up @@ -155,9 +159,10 @@ final class ClusterShardingSettings(
val tuningParameters: ClusterShardingSettings.TuningParameters,
val coordinatorSingletonSettings: ClusterSingletonManagerSettings) extends NoSerializationVerificationNeeded {

import ClusterShardingSettings.{ StateStoreModePersistence, StateStoreModeDData }
require(
stateStoreMode == "persistence" || stateStoreMode == "ddata",
s"Unknown 'state-store-mode' [$stateStoreMode], valid values are 'persistence' or 'ddata'")
stateStoreMode == StateStoreModePersistence || stateStoreMode == StateStoreModeDData,
s"Unknown 'state-store-mode' [$stateStoreMode], valid values are '$StateStoreModeDData' or '$StateStoreModePersistence'")

def withRole(role: String): ClusterShardingSettings = copy(role = ClusterShardingSettings.roleOption(role))

Expand Down
Loading

0 comments on commit 37679d3

Please sign in to comment.