Skip to content

Commit

Permalink
akka#21955: Fixed the case when identity flow appended to Source comb…
Browse files Browse the repository at this point in the history
…ining materialized values (akka#21956)
  • Loading branch information
drewhk authored Dec 19, 2016
1 parent 2cb912d commit 1085179
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import akka.NotUsed
import akka.stream._
import akka.stream.testkit._

import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.duration._

class GraphMatValueSpec extends StreamSpec {
Expand Down Expand Up @@ -227,5 +226,19 @@ class GraphMatValueSpec extends StreamSpec {
matValue should ===(NotUsed)

}

"not ignore materialized value of indentity flow which is optimized away" in {
implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false))
val (m1, m2) = Source.single(1).viaMat(Flow[Int])(Keep.both).to(Sink.ignore).run()
m1 should ===(NotUsed)
m2 should ===(NotUsed)

// Fails with ClassCastException if value is wrong
val m3: Promise[Option[Int]] = Source.maybe[Int].viaMat(Flow[Int])(Keep.left).to(Sink.ignore).run()
m3.success(None)

val m4 = Source.single(1).viaMat(Flow[Int])(Keep.right).to(Sink.ignore).run()
m4 should ===(NotUsed)
}
}
}
10 changes: 8 additions & 2 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,14 @@ final class Source[+Out, +Mat](override val module: Module)
override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)

override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) Mat3): Source[T, Mat3] = {
if (flow.module eq GraphStages.Identity.module) this.asInstanceOf[Source[T, Mat3]]
else {
if (flow.module eq GraphStages.Identity.module) {
if (combine eq Keep.left)
this.asInstanceOf[Source[T, Mat3]]
else if (combine eq Keep.right)
this.mapMaterializedValue((_) NotUsed).asInstanceOf[Source[T, Mat3]]
else
this.mapMaterializedValue(combine(_, NotUsed.asInstanceOf[Mat2])).asInstanceOf[Source[T, Mat3]]
} else {
val flowCopy = flow.module.carbonCopy
new Source(
module
Expand Down

0 comments on commit 1085179

Please sign in to comment.