diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala index cb524e0a16f..06033023610 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphMatValueSpec.scala @@ -7,8 +7,7 @@ import akka.NotUsed import akka.stream._ import akka.stream.testkit._ -import scala.concurrent.Await -import scala.concurrent.Future +import scala.concurrent.{Await, Future, Promise} import scala.concurrent.duration._ class GraphMatValueSpec extends StreamSpec { @@ -227,5 +226,19 @@ class GraphMatValueSpec extends StreamSpec { matValue should ===(NotUsed) } + + "not ignore materialized value of indentity flow which is optimized away" in { + implicit val mat = ActorMaterializer(ActorMaterializerSettings(system).withAutoFusing(false)) + val (m1, m2) = Source.single(1).viaMat(Flow[Int])(Keep.both).to(Sink.ignore).run() + m1 should ===(NotUsed) + m2 should ===(NotUsed) + + // Fails with ClassCastException if value is wrong + val m3: Promise[Option[Int]] = Source.maybe[Int].viaMat(Flow[Int])(Keep.left).to(Sink.ignore).run() + m3.success(None) + + val m4 = Source.single(1).viaMat(Flow[Int])(Keep.right).to(Sink.ignore).run() + m4 should ===(NotUsed) + } } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 1922c058dc3..f235773ecd7 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -43,8 +43,14 @@ final class Source[+Out, +Mat](override val module: Module) override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left) override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = { - if (flow.module eq GraphStages.Identity.module) this.asInstanceOf[Source[T, Mat3]] - else { + if (flow.module eq GraphStages.Identity.module) { + if (combine eq Keep.left) + this.asInstanceOf[Source[T, Mat3]] + else if (combine eq Keep.right) + this.mapMaterializedValue((_) ⇒ NotUsed).asInstanceOf[Source[T, Mat3]] + else + this.mapMaterializedValue(combine(_, NotUsed.asInstanceOf[Mat2])).asInstanceOf[Source[T, Mat3]] + } else { val flowCopy = flow.module.carbonCopy new Source( module