Skip to content

Commit

Permalink
=rem akka#21365 enable multiple lanes in MaxThroughputSpec
Browse files Browse the repository at this point in the history
This needed the other change for each sender to send to all of the target
actors. Otherwise, large batches of messages to the same target actor would
limit the potential of actually doing work in parallel with multiple lanes due
to head-of-line blocking.
  • Loading branch information
jrudolph committed Dec 30, 2016
1 parent 35feef8 commit e66cb02
Showing 1 changed file with 82 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -70,52 +70,80 @@ object MaxThroughputSpec extends MultiNodeConfig {
actor-refs.advertisement-interval = 2 second
manifests.advertisement-interval = 2 second
}

advanced {
inbound-lanes = 2
# buffer-pool-size = 512
}
}
}
akka.remote.default-remote-dispatcher {
fork-join-executor {
# parallelism-factor = 0.5
parallelism-min = 2
parallelism-max = 2
}
# Set to 10 by default. Might be worthwhile to experiment with.
# throughput = 100
}
""")).withFallback(RemotingMultiNodeSpec.commonConfig))

case object Run
sealed trait Echo extends DeadLetterSuppression with JavaSerializable
final case object Start extends Echo
final case class Start(correspondingReceiver: ActorRef) extends Echo
final case object End extends Echo
final case class Warmup(msg: AnyRef)
final case class EndResult(totalReceived: Long) extends JavaSerializable
final case class FlowControl(burstStartTime: Long) extends Echo

def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean): Props =
Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics)).withDispatcher("akka.remote.default-remote-dispatcher")
def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props =
Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher")

class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean) extends Actor {
class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int) extends Actor {
private var c = 0L
private val taskRunnerMetrics = new TaskRunnerMetrics(context.system)
private var endMessagesMissing = numSenders
private var correspondingSender: ActorRef = null // the Actor which send the Start message will also receive the report

def receive = {
case msg: Array[Byte]
if (msg.length != payloadSize) throw new IllegalArgumentException("Invalid message")
reporter.onMessage(1, payloadSize)
c += 1

report()
case msg: TestMessage
reporter.onMessage(1, payloadSize)
c += 1
case Start
c = 0
report()

case Start(corresponding)
if (corresponding == self) correspondingSender = sender()
sender() ! Start

case End if endMessagesMissing > 1
endMessagesMissing -= 1 // wait for End message from all senders

case End
if (printTaskRunnerMetrics)
taskRunnerMetrics.printHistograms()
sender() ! EndResult(c)
correspondingSender ! EndResult(c)
context.stop(self)

case m: Echo
sender() ! m
}

def report(): Unit = {
reporter.onMessage(1, payloadSize)
c += 1
}
}

def senderProps(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef,
def senderProps(mainTarget: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef,
printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props =
Props(new Sender(target, testSettings, plotRef, printTaskRunnerMetrics, reporter))
Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter))

class Sender(target: ActorRef, testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter)
class Sender(target: ActorRef, targets: Array[ActorRef], testSettings: TestSettings, plotRef: ActorRef, printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter)
extends Actor {
val numTargets = targets.size

import testSettings._
val payload = ("0" * testSettings.payloadSize).getBytes("utf-8")
var startTime = 0L
Expand All @@ -132,50 +160,59 @@ object MaxThroughputSpec extends MultiNodeConfig {
def receive = {
case Run
if (compressionEnabled) {
target ! payload
target ! Warmup(payload)
context.setReceiveTimeout(1.second)
context.become(waitingForCompression)
} else {
sendBatch() // first some warmup
target ! Start // then Start, which will echo back here
context.become(active)
}
} else runWarmup()
}

def waitingForCompression: Receive = {
case ReceivedActorRefCompressionTable(_, table)
if (table.dictionary.contains(target)) {
sendBatch() // first some warmup
target ! Start // then Start, which will echo back here
context.setReceiveTimeout(Duration.Undefined)
context.become(active)
runWarmup()
} else
target ! payload
target ! Warmup(payload)
case ReceiveTimeout
target ! payload
target ! Warmup(payload)
}

def active: Receive = {
def runWarmup(): Unit = {
sendBatch(warmup = true) // first some warmup
targets.foreach(_ ! Start(target)) // then Start, which will echo back here
context.become(warmup)
}

def warmup: Receive = {
case Start
println(s"${self.path.name}: Starting benchmark of $totalMessages messages with burst size " +
s"$burstSize and payload size $payloadSize")
startTime = System.nanoTime
remaining = totalMessages
(0 until sent.size).foreach(i sent(i) = 0)
// have a few batches in flight to make sure there are always messages to send
(1 to 3).foreach { _
val t0 = System.nanoTime()
sendBatch()
sendBatch(warmup = false)
sendFlowControl(t0)
}

context.become(active)

case _: Warmup
}

def active: Receive = {
case c @ FlowControl(t0)
val now = System.nanoTime()
val duration = NANOSECONDS.toMillis(now - t0)
maxRoundTripMillis = math.max(maxRoundTripMillis, duration)

sendBatch()
sendBatch(warmup = false)
sendFlowControl(now)
}

val waitingForEndResult: Receive = {
case EndResult(totalReceived)
val took = NANOSECONDS.toMillis(System.nanoTime - startTime)
val throughput = (totalReceived * 1000.0 / took)
Expand All @@ -191,7 +228,7 @@ object MaxThroughputSpec extends MultiNodeConfig {
s"burst size $burstSize, " +
s"payload size $payloadSize, " +
s"total size ${totalSize(context.system)}, " +
s"$took ms to deliver $totalReceived messages")
s"$took ms to deliver $totalReceived messages.")

if (printTaskRunnerMetrics)
taskRunnerMetrics.printHistograms()
Expand All @@ -202,11 +239,12 @@ object MaxThroughputSpec extends MultiNodeConfig {
case c: ReceivedActorRefCompressionTable
}

def sendBatch(): Unit = {
val sent = new Array[Long](targets.size)
def sendBatch(warmup: Boolean): Unit = {
val batchSize = math.min(remaining, burstSize)
var i = 0
while (i < batchSize) {
val msg =
val msg0 =
if (realMessage)
TestMessage(
id = totalMessages - remaining + i,
Expand All @@ -217,17 +255,20 @@ object MaxThroughputSpec extends MultiNodeConfig {
items = Vector(TestMessage.Item(1, "A"), TestMessage.Item(2, "B")))
else payload

// target ! msg
target.tell(msg, ActorRef.noSender)
val msg1 = if (warmup) Warmup(msg0) else msg0

targets(i % numTargets).tell(msg1, ActorRef.noSender)
sent(i % numTargets) += 1
i += 1
}
remaining -= batchSize
}

def sendFlowControl(t0: Long): Unit = {
if (remaining <= 0)
target ! End
else
if (remaining <= 0) {
context.become(waitingForEndResult)
targets.foreach(_ ! End)
} else
target ! FlowControl(t0)
}
}
Expand Down Expand Up @@ -315,7 +356,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec

def identifyReceiver(name: String, r: RoleName = second): ActorRef = {
system.actorSelection(node(r) / "user" / name) ! Identify(None)
expectMsgType[ActorIdentity].ref.get
expectMsgType[ActorIdentity](10.seconds).ref.get
}

val scenarios = List(
Expand Down Expand Up @@ -365,7 +406,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
val rep = reporter(testName)
for (n 1 to senderReceiverPairs) {
val receiver = system.actorOf(
receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1),
receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1, senderReceiverPairs),
receiverName + n)
}
enterBarrier(receiverName + "-started")
Expand All @@ -376,11 +417,12 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec
runOn(first) {
enterBarrier(receiverName + "-started")
val ignore = TestProbe()
val receivers = (for (n 1 to senderReceiverPairs) yield identifyReceiver(receiverName + n)).toArray
val senders = for (n 1 to senderReceiverPairs) yield {
val receiver = identifyReceiver(receiverName + n)
val receiver = receivers(n - 1)
val plotProbe = TestProbe()
val snd = system.actorOf(
senderProps(receiver, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter),
senderProps(receiver, receivers, testSettings, plotProbe.ref, printTaskRunnerMetrics = n == 1, resultReporter),
testName + "-snd" + n)
val terminationProbe = TestProbe()
terminationProbe.watch(snd)
Expand Down

0 comments on commit e66cb02

Please sign in to comment.