Skip to content

Commit

Permalink
=rem akka#21365 cleanup multiple lane creation slightly
Browse files Browse the repository at this point in the history
  • Loading branch information
jrudolph committed Dec 30, 2016
1 parent 2f5f93d commit 73ff3e1
Showing 1 changed file with 16 additions and 20 deletions.
36 changes: 16 additions & 20 deletions akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -693,35 +693,31 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R

} else {
val hubKillSwitch = KillSwitches.shared("hubKillSwitch")
val source = aeronSource(ordinaryStreamId, envelopeBufferPool)
.via(hubKillSwitch.flow)
.via(inboundFlow(compression))
.map(env (env.recipient, env))

val (resourceLife, broadcastHub) =
source
aeronSource(ordinaryStreamId, envelopeBufferPool)
.via(hubKillSwitch.flow)
.via(inboundFlow(compression))
.map(env (env.recipient, env))
.toMat(BroadcastHub.sink(bufferSize = settings.Advanced.InboundBroadcastHubBufferSize))(Keep.both)
.run()(materializer)

val lane = inboundSink(envelopeBufferPool)

// select lane based on destination, to preserve message order
val partitionFun: OptionVal[ActorRef] Int = {
_ match {
case OptionVal.Some(r) math.abs(r.path.uid) % inboundLanes
case OptionVal.None 0
def shouldUseLane(recipient: OptionVal[ActorRef], targetLane: Int): Boolean =
recipient match {
case OptionVal.Some(r) math.abs(r.path.uid) % inboundLanes == targetLane
case OptionVal.None 0 == targetLane
}
}

val lane = inboundSink(envelopeBufferPool)
val completedValues: Vector[Future[Done]] =
(0 until inboundLanes).map { i
broadcastHub.runWith(
(0 until inboundLanes).map { laneId
broadcastHub
// TODO replace filter with "PartitionHub" when that is implemented
// must use a tuple here because envelope is pooled and must only be touched in the selected lane
Flow[(OptionVal[ActorRef], InboundEnvelope)].collect {
case (recipient, env) if partitionFun(recipient) == i env
}
.toMat(lane)(Keep.right))(materializer)
// must use a tuple here because envelope is pooled and must only be read in the selected lane
// otherwise, the lane that actually processes it might have already released it.
.collect { case (recipient, env) if shouldUseLane(recipient, laneId) env }
.toMat(lane)(Keep.right)
.run()(materializer)
}(collection.breakOut)

import system.dispatcher
Expand Down

0 comments on commit 73ff3e1

Please sign in to comment.