diff --git a/redis/src/main/scala/com/avsystem/commons/redis/actor/RedisConnectionActor.scala b/redis/src/main/scala/com/avsystem/commons/redis/actor/RedisConnectionActor.scala index da2bc4ec3..d543a35bd 100644 --- a/redis/src/main/scala/com/avsystem/commons/redis/actor/RedisConnectionActor.scala +++ b/redis/src/main/scala/com/avsystem/commons/redis/actor/RedisConnectionActor.scala @@ -94,29 +94,37 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) private def connecting(retryStrategy: RetryStrategy, readInitSender: Opt[ActorRef]): Receive = { case open: Open => + log.error(s"KUDY: connecting: Open") onOpen(open) case IncomingPacks(packs) => + log.error(s"KUDY: connecting: IncomingPacks") handlePacks(packs) case Release if reservedBy.contains(sender()) => + log.error(s"KUDY: connecting: Release if reservedBy.contains(sender())") handleRelease() case Connect => - log.debug(s"Connecting to $address") + log.error(s"KUDY: connecting: Connect") doConnect() case Connected(connection, remoteAddress, localAddress) => - log.debug(s"Connected to Redis at $address") + log.error(s"KUDY: connecting: Connected(connection, $remoteAddress, $localAddress)") //TODO: use dedicated retry strategy for initialization instead of reconnection strategy new ConnectedTo(connection, localAddress, remoteAddress).initialize(config.reconnectionStrategy) readInitSender.foreach(_ ! ReadAck) case _: ConnectionFailed => - log.error(s"Connection attempt to Redis at $address failed") + log.error(s"KUDY: connecting: ConnectionFailed attempt to Redis at $address failed") tryReconnect(retryStrategy, new ConnectionFailedException(address)) case Close(cause, stopSelf) => + log.error(s"KUDY: connecting: Close, stopSelf $stopSelf") close(cause, stopSelf, tcpConnecting = true) case ReadInit => + log.error(s"KUDY: connecting: ReadInit") // not sure if it's possible to receive ReadInit before Connected but just to be safe // delay replying with ReadAck until Connected is received + become(connecting(retryStrategy, Opt(sender()))) - case _: TcpEvent => //ignore, this is from previous connection + case e: TcpEvent => + log.error(s"KUDY: connecting: TcpEvent, $e") + //ignore, this is from previous connection } // previously this was implemented using Akka IO, now using Akka Streams in a way that mimics Akka IO @@ -207,6 +215,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) def become(receive: Receive): Unit = context.become(receive unless { case Connected(oldConnection, _, _) if oldConnection != connection => + log.error(s"KUDY: context.become(receive unless {: IncomingPacks: Connected(oldConnection, _, _) if oldConnection != connection") oldConnection ! CloseConnection(immediate = true) }) @@ -226,16 +235,22 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) def initializing(collector: ReplyCollector, retryStrategy: RetryStrategy): Receive = { case open: Open => + log.error(s"KUDY: initializing: Open") onOpen(open) case IncomingPacks(packs) => + log.error(s"KUDY: initializing: IncomingPacks") handlePacks(packs) case Release if reservedBy.contains(sender()) => + log.error(s"KUDY: initializing: Release if reservedBy.contains(sender())") handleRelease() case cc: ConnectionClosed => + log.error(s"KUDY: initializing: ConnectionClosed") onConnectionClosed(cc) tryReconnect(config.reconnectionStrategy, new ConnectionClosedException(address, cc.error)) case WriteAck => + log.error(s"KUDY: initializing: WriteAck") case data: ByteString => + log.error(s"KUDY: initializing: data: ByteString") logReceived(data) try decoder.decodeMore(data)(collector.processMessage(_, this)) catch { case NonFatal(cause) => @@ -245,10 +260,13 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) sender() ! ReadAck } case ReadInit => + log.error(s"KUDY: initializing: ReadInit") sender() ! ReadAck case RetryInit(newStrategy) => + log.error(s"KUDY: initializing: RetryInit(newStrategy)") initialize(newStrategy) case Close(cause, stop) => + log.error(s"KUDY: initializing: Close(cause, stop)") close(cause, stop) } @@ -278,30 +296,38 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) def ready: Receive = { case open: Open => + log.error(s"KUDY: ready: Open") onOpen(open) initPromise.success(()) case IncomingPacks(packs) => + log.error(s"KUDY: ready: IncomingPacks") handlePacks(packs) writeIfPossible() case Release if reservedBy.contains(sender()) => + log.error(s"KUDY: ready: Release if reservedBy.contains(sender())") handleRelease() if (watching) { unwatch = true } writeIfPossible() case WriteAck => + log.error(s"KUDY: ready: WriteAck") waitingForAck = false writeIfPossible() case cc: ConnectionClosed => + log.error(s"KUDY: ready: ConnectionClosed") onConnectionClosed(cc) val cause = new ConnectionClosedException(address, cc.error) failAlreadySent(cause) tryReconnect(config.reconnectionStrategy, cause) case data: ByteString => + log.error(s"KUDY: ready: ByteString") onMoreData(data, sender()) case ReadInit => + log.error(s"KUDY: ready: ReadInit") sender() ! ReadAck case Close(cause, stopSelf) => + log.error(s"KUDY: ready: Close(cause, stopSelf)") failQueued(cause) if (!closeIfIdle(cause, stopSelf)) { // graceful close, wait for already sent commands to finish @@ -311,22 +337,30 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) def closing(cause: Throwable, stopSelf: Boolean): Receive = { case open: Open => + log.error(s"KUDY: closing: Open") onOpen(open) initPromise.success(()) become(ready) case IncomingPacks(packs) => + log.error(s"KUDY: closing: IncomingPacks") packs.reply(PacksResult.Failure(cause)) - case Release => //ignore + case Release => + log.error(s"KUDY: closing: Release") + //ignore case WriteAck => + log.error(s"KUDY: closing: WriteAck") waitingForAck = false case cc: ConnectionClosed => + log.error(s"KUDY: closing: ConnectionClosed") onConnectionClosed(cc) failAlreadySent(new ConnectionClosedException(address, cc.error)) close(cause, stopSelf) case data: ByteString => + log.error(s"KUDY: closing: ByteString") onMoreData(data, sender()) closeIfIdle(cause, stopSelf) case ReadInit => + log.error(s"KUDY: closing: ReadInit") sender() ! ReadAck } @@ -496,6 +530,7 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) private def closed(cause: Throwable, tcpConnecting: Boolean): Receive = { case open: Open => + log.error(s"KUDY: closing: ByteString") onOpen(open) incarnation = 0 become(connecting(config.reconnectionStrategy, Opt.Empty)) @@ -503,14 +538,21 @@ final class RedisConnectionActor(address: NodeAddress, config: ConnectionConfig) self ! Connect } case IncomingPacks(packs) => + log.error(s"KUDY: closing: ByteString") packs.reply(PacksResult.Failure(cause)) - case Release => // ignore + case Release => + log.error(s"KUDY: closing: ByteString") + // ignore case Connected(connection, _, _) if tcpConnecting => + log.error(s"KUDY: closing: ByteString") // failure may have happened while connecting, simply close the connection connection ! CloseConnection(immediate = true) become(closed(cause, tcpConnecting = false)) - case _: TcpEvent => // ignore + case _: TcpEvent => + log.error(s"KUDY: closing: ByteString") + // ignore case Close(_, true) => + log.error(s"KUDY: closing: ByteString") stop(self) } }