Skip to content

Commit

Permalink
=str deprecate ActorPublisher/Subscriber, use GraphStage (akka#21952)
Browse files Browse the repository at this point in the history
* =str deprecate ActorPublisher/Subscriber, use GraphStage

* =str deprecate Source.actorPublisher / Sink.actorSubscriber

* =str added deprecation note of ActorPublisher,Subscriber
  • Loading branch information
ktoso authored Dec 8, 2016
1 parent 591eafe commit 2ea8cd7
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 0 deletions.
16 changes: 16 additions & 0 deletions akka-docs/rst/java/stream/stream-integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre
ActorPublisher
--------------

.. warning::
**Deprecation warning:** ``ActorPublisher`` is deprecated in favour of the vastly more
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
expose a "stage actor ref" is needed to be addressed as-if an Actor.
Custom stages implemented using ``GraphStage`` are also automatically fusable.

To learn more about implementing custom stages using it refer to :ref:`graphstage-java`.

Extend :class:`akka.stream.actor.AbstractActorPublisher` to implement a
stream publisher that keeps track of the subscription life cycle and requested elements.

Expand Down Expand Up @@ -482,6 +490,14 @@ attach a ``Sink.asPublisher(AsPublisher.WITH_FANOUT)`` to enable multiple subscr
ActorSubscriber
---------------

.. warning::
**Deprecation warning:** ``ActorSubscriber`` is deprecated in favour of the vastly more
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
expose a "stage actor ref" is needed to be addressed as-if an Actor.
Custom stages implemented using ``GraphStage`` are also automatically fusable.

To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`.

Extend :class:`akka.stream.actor.AbstractActorSubscriber` to make your class a stream subscriber with
full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
Expand Down
17 changes: 17 additions & 0 deletions akka-docs/rst/project/migration-guide-2.4.x-2.5.x.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,23 @@ would now be::

as the ``GraphStage`` itself is a factory of logic instances.

Deprecation of ActorSubscriber and ActorPublisher
-------------------------------------------------

The classes ``ActorPublisher`` and ``ActorSubscriber`` were the first user-facing Reactive Streams integration
API that we provided for end-users. Akka Streams APIs have evolved and improved a lot since then, and now
there is no need to use these low-level abstractions anymore. It is easy to get things wrong when implementing them,
and one would have to validate each implementation of such Actor using the Reactive Streams Technology Compatibility Kit.

The replacement API is the powerful ``GraphStage``. It has all features that raw Actors provided for implementing Stream
stages and adds additional protocol and type-safety. You can learn all about it in the documentation:
:ref:`stream-customize-scala`and :ref:`Custom stream processing in JavaDSL <stream-customize-java>`.

You should also read the blog post series on the official team blog, starting with `Mastering GraphStages, part I`_,
which explains using and implementing GraphStages in more practical terms than the reference documentation.

.. _Mastering GraphStages, part I: http://blog.akka.io/streams/2016/07/30/mastering-graph-stage-part-1

Agents
======

Expand Down
16 changes: 16 additions & 0 deletions akka-docs/rst/scala/stream/stream-integrations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,14 @@ These can be consumed by other Reactive Stream libraries or used as an Akka Stre
ActorPublisher
--------------

.. warning::
**Deprecation warning:** ``ActorPublisher`` is deprecated in favour of the vastly more
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
expose a "stage actor ref" is needed to be addressed as-if an Actor.
Custom stages implemented using ``GraphStage`` are also automatically fusable.

To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`.

Extend/mixin :class:`akka.stream.actor.ActorPublisher` in your :class:`Actor` to make it a
stream publisher that keeps track of the subscription life cycle and requested elements.

Expand Down Expand Up @@ -482,6 +490,14 @@ subscription attempts will be rejected with an :class:`IllegalStateException`.
ActorSubscriber
---------------

.. warning::
**Deprecation warning:** ``ActorSubscriber`` is deprecated in favour of the vastly more
type-safe and safe to implement :class:`akka.stream.stage.GraphStage`. It can also
expose a "stage actor ref" is needed to be addressed as-if an Actor.
Custom stages implemented using ``GraphStage`` are also automatically fusable.

To learn more about implementing custom stages using it refer to :ref:`graphstage-scala`.

Extend/mixin :class:`akka.stream.actor.ActorSubscriber` in your :class:`Actor` to make it a
stream subscriber with full control of stream back pressure. It will receive
``ActorSubscriberMessage.OnNext``, ``ActorSubscriberMessage.OnComplete`` and ``ActorSubscriberMessage.OnError``
Expand Down
17 changes: 17 additions & 0 deletions akka-stream/src/main/scala/akka/stream/actor/ActorPublisher.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import concurrent.duration.FiniteDuration
import akka.stream.impl.CancelledSubscription
import akka.stream.impl.ReactiveStreamsCompliance._

@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
object ActorPublisher {

/**
Expand Down Expand Up @@ -120,7 +121,10 @@ object ActorPublisherMessage {
*
* If the actor is stopped the stream will be completed, unless it was not already terminated with
* failure, completed or canceled.
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
trait ActorPublisher[T] extends Actor {
import ActorPublisher.Internal._
import ActorPublisherMessage._
Expand Down Expand Up @@ -450,6 +454,7 @@ object UntypedActorPublisher {
* Java API
* @see [[akka.stream.actor.ActorPublisher]]
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class UntypedActorPublisher[T] extends UntypedActor with ActorPublisher[T]

/**
Expand All @@ -467,26 +472,38 @@ object AbstractActorPublisher {
/**
* Java API compatible with lambda expressions
* @see [[akka.stream.actor.ActorPublisher]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class AbstractActorPublisher[T] extends AbstractActor with ActorPublisher[T]

/**
* Java API compatible with lambda expressions.
* This class adds a Stash to {@link AbstractActorPublisher}.
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithStash]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class AbstractActorPublisherWithStash[T] extends AbstractActor with ActorPublisher[T] with Stash

/**
* Java API compatible with lambda expressions.
* This class adds an unbounded Stash to {@link AbstractActorPublisher}.
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnboundedStash]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class AbstractActorPublisherWithUnboundedStash[T] extends AbstractActor with ActorPublisher[T] with UnboundedStash

/**
* Java API compatible with lambda expressions.
* This class adds an unrestricted Stash to {@link AbstractActorPublisher}.
* @see [[akka.stream.actor.ActorPublisher]] and [[akka.stream.actor.AbstractActorWithUnrestrictedStash]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class AbstractActorPublisherWithUnrestrictedStash[T] extends AbstractActor with ActorPublisher[T] with UnrestrictedStash
12 changes: 12 additions & 0 deletions akka-stream/src/main/scala/akka/stream/actor/ActorSubscriber.scala
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,10 @@ abstract class MaxInFlightRequestStrategy(max: Int) extends RequestStrategy {
* together with [[ZeroRequestStrategy]] or some other strategy. In that case
* you must also call [[#request]] when the actor is started or when it is ready, otherwise
* it will not receive any elements.
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
trait ActorSubscriber extends Actor {
import ActorSubscriber._
import ActorSubscriberMessage._
Expand Down Expand Up @@ -347,12 +350,18 @@ object UntypedActorSubscriber {
/**
* Java API
* @see [[akka.stream.actor.ActorSubscriber]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class UntypedActorSubscriber extends UntypedActor with ActorSubscriber

/**
* Java API compatible with lambda expressions
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
object AbstractActorSubscriber {
/**
* Java API compatible with lambda expressions: Attach a [[AbstractActorSubscriber]] actor
Expand All @@ -365,5 +374,8 @@ object AbstractActorSubscriber {
/**
* Java API compatible with lambda expressions
* @see [[akka.stream.actor.ActorSubscriber]]
*
* @deprecated Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
abstract class AbstractActorSubscriber extends AbstractActor with ActorSubscriber
3 changes: 3 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,10 @@ object Sink {
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorSubscriber]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
def actorSubscriber[T](props: Props): Sink[T, ActorRef] =
new Sink(scaladsl.Sink.actorSubscriber(props))

Expand Down
3 changes: 3 additions & 0 deletions akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,10 @@ object Source {
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` should
* be [[akka.stream.actor.ActorPublisher]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
def actorPublisher[T](props: Props): Source[T, ActorRef] =
new Source(scaladsl.Source.actorPublisher(props))

Expand Down
3 changes: 3 additions & 0 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,10 @@ object Sink {
* Creates a `Sink` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorSubscriber]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
def actorSubscriber[T](props: Props): Sink[T, ActorRef] = {
require(classOf[ActorSubscriber].isAssignableFrom(props.actorClass()), "Actor must be ActorSubscriber")
new Sink(new ActorSubscriberSink(props, DefaultAttributes.actorSubscriberSink, shape("ActorSubscriberSink")))
Expand Down
3 changes: 3 additions & 0 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,10 @@ object Source {
* Creates a `Source` that is materialized to an [[akka.actor.ActorRef]] which points to an Actor
* created according to the passed in [[akka.actor.Props]]. Actor created by the `props` must
* be [[akka.stream.actor.ActorPublisher]].
*
* @deprecated Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.
*/
@deprecated("Use `akka.stream.stage.GraphStage` and `fromGraph` instead, it allows for all operations an Actor would and is more type-safe as well as guaranteed to be ReactiveStreams compliant.", since = "2.5.0")
def actorPublisher[T](props: Props): Source[T, ActorRef] = {
require(classOf[ActorPublisher[_]].isAssignableFrom(props.actorClass()), "Actor must be ActorPublisher")
new Source(new ActorPublisherSource(props, DefaultAttributes.actorPublisherSource, shape("ActorPublisherSource")))
Expand Down

0 comments on commit 2ea8cd7

Please sign in to comment.