Skip to content

Commit

Permalink
=str 20448 splitAfter should emit substreams in a lazy way (akka#21306)
Browse files Browse the repository at this point in the history
  • Loading branch information
agolubev authored and ktoso committed Dec 14, 2016
1 parent 5b5cf4f commit 4207682
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package akka.stream.scaladsl
import akka.NotUsed
import akka.stream._
import akka.stream.Supervision.resumingDecider
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils._
import org.reactivestreams.Publisher
Expand Down Expand Up @@ -260,6 +261,40 @@ class FlowSplitAfterSpec extends StreamSpec {
}
}

"work when last element is split-by" in assertAllStagesStopped {
new SubstreamsSupport(splitAfter = 3, elementCount = 3) {
val s1 = StreamPuppet(expectSubFlow().runWith(Sink.asPublisher(false)))
masterSubscriber.expectNoMsg(100.millis)

s1.request(3)
s1.expectNext(1)
s1.expectNext(2)
s1.expectNext(3)
s1.expectComplete()

masterSubscription.request(1)
masterSubscriber.expectComplete()
}
}

"fail stream if substream not materialized in time" in assertAllStagesStopped {
val tightTimeoutMaterializer =
ActorMaterializer(ActorMaterializerSettings(system)
.withSubscriptionTimeoutSettings(
StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, 500.millisecond)))

val testSource = Source.single(1).concat(Source.maybe).splitAfter(_ true)

a[SubscriptionTimeoutException] mustBe thrownBy {
Await.result(
testSource.lift
.delay(1.second)
.flatMapConcat(identity)
.runWith(Sink.ignore)(tightTimeoutMaterializer),
3.seconds)
}
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -429,22 +429,19 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va

setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (substreamSource eq null) pull(in)
else if (!substreamWaitingToBePushed) {
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(SubscriptionTimer, timeout)
substreamWaitingToBePushed = true
}
if (substreamSource eq null) {
//can be already pulled from substream in case split after
if (!hasBeenPulled(in)) pull(in)
} else if (substreamWaitingToBePushed) pushSubstreamSource()
}

override def onDownstreamFinish(): Unit = {
// If the substream is already cancelled or it has not been handed out, we can go away
if (!substreamWaitingToBePushed || substreamCancelled) completeStage()
if ((substreamSource eq null) || substreamWaitingToBePushed || substreamCancelled) completeStage()
}
})

// initial input handler
setHandler(in, new InHandler {
val initInHandler = new InHandler {
override def onPush(): Unit = {
val handler = new SubstreamHandler
val elem = grab(in)
Expand All @@ -460,7 +457,10 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va
handOver(handler)
}
override def onUpstreamFinish(): Unit = completeStage()
})
}

// initial input handler
setHandler(in, initInHandler)

private def handOver(handler: SubstreamHandler): Unit = {
if (isClosed(out)) completeStage()
Expand All @@ -472,13 +472,17 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va
setKeepGoing(enabled = handler.hasInitialElement)

if (isAvailable(out)) {
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(SubscriptionTimer, timeout)
substreamWaitingToBePushed = true
} else substreamWaitingToBePushed = false
if (decision == SplitBefore || handler.hasInitialElement) pushSubstreamSource() else pull(in)
} else substreamWaitingToBePushed = true
}
}

private def pushSubstreamSource(): Unit = {
push(out, Source.fromGraph(substreamSource.source))
scheduleOnce(SubscriptionTimer, timeout)
substreamWaitingToBePushed = false
}

override protected def onTimer(timerKey: Any): Unit = substreamSource.timeout(timeout)

private class SubstreamHandler extends InHandler with OutHandler {
Expand All @@ -503,6 +507,7 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va
}

override def onPull(): Unit = {
cancelTimer(SubscriptionTimer)
if (hasInitialElement) {
substreamSource.push(firstElem)
firstElem = null.asInstanceOf[T]
Expand Down Expand Up @@ -530,7 +535,12 @@ final class Split[T](val decision: Split.SplitDecision, val p: T ⇒ Boolean, va
if (p(elem)) {
val handler = new SubstreamHandler
closeThis(handler, elem)
handOver(handler)
if (decision == SplitBefore) handOver(handler)
else {
substreamSource = null
setHandler(in, initInHandler)
pull(in)
}
} else {
// Drain into the void
if (substreamCancelled) pull(in)
Expand Down

0 comments on commit 4207682

Please sign in to comment.