Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Revert update to Finagle 6+ until dependencies can be advanced to the…
Browse files Browse the repository at this point in the history
… compatible versions

naggati still uses Util version of 5.x, this causes memcache clients to behave incorrectly. I am reverting the change to update the finagle (and util) version until I can update naggati.

I will move this to birdcage manually once the repo is ready in birdcage.

RB_ID=141014
  • Loading branch information
dhamanka committed Apr 19, 2013
1 parent a4decc5 commit ac8a224
Show file tree
Hide file tree
Showing 6 changed files with 14 additions and 22 deletions.
2 changes: 1 addition & 1 deletion project/Build.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import Keys._
import com.twitter.sbt._

object Kestrel extends Build {
val finagleVersion = "6.1.0"
val finagleVersion = "5.3.23"

lazy val root = Project(
id = "kestrel",
Expand Down
16 changes: 4 additions & 12 deletions src/main/scala/net/lag/kestrel/KestrelHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import com.twitter.util._
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.mutable
import scala.collection.Set
import java.util.concurrent.CancellationException

class TooManyOpenReadsException extends Exception("Too many open reads.")
object TooManyOpenReadsException extends TooManyOpenReadsException
Expand Down Expand Up @@ -222,17 +221,6 @@ abstract class KestrelHandler(
}
val startTime = Time.now
val future = queues.remove(key, timeout, opening, peeking, Some(sessionDescription))

// Add an exception handler to handle the cancellation exceptions
future.onFailure {
case _: CancellationException | _: FutureCancelledException =>
// if the connection is closed, pre-emptively return un-acked items.
abortAnyOpenRead(Kestrel.traceSessions)
case e =>
// There is no clean-up action here, simply log the fact that this happened
log.info("Exception thrown in get %s", e)
}

waitingFor = Some(future)
future.map { itemOption =>
waitingFor = None
Expand All @@ -242,6 +230,10 @@ abstract class KestrelHandler(
}
itemOption
}
future.onCancellation {
// if the connection is closed, pre-emptively return un-acked items.
abortAnyOpenRead(Kestrel.traceSessions)
}
future
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/net/lag/kestrel/MemcacheHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class MemcacheHandler(
serverStatus) with SimplePendingReads
log.debug("New session %d from %s", sessionId, clientDescription)

override def close(deadline: Time) = {
override def release() {
handler.finish()
super.close(deadline)
super.release()
}

protected def clientDescription: String = {
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
// checking future.isCancelled is a race, but only means that an item may be removed &
// then un-removed at a higher level if the connection is closed. it's an optimization
// to let un-acked items get returned before this timeout.
if (promise.isInterrupted.isDefined) {
if (promise.isCancelled) {
promise.setValue(None)
waiters.trigger()
} else {
Expand All @@ -427,7 +427,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
promise.setValue(None)
}
val w = waiters.add(deadline.get, onTrigger, onTimeout)
promise.setInterruptHandler { case _ => waiters.remove(w) }
promise.onCancellation { waiters.remove(w) }
false
} else {
true
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/net/lag/kestrel/TextHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ class TextHandler(
"%s:%d".format(address.getAddress.getHostAddress, address.getPort)
}

override def close(deadline: Time) = {
override def release() {
handler.finish()
super.close(deadline)
super.release()
}

def apply(request: TextRequest): Future[TextResponse] = {
Expand Down
6 changes: 3 additions & 3 deletions src/main/scala/net/lag/kestrel/ThriftHandler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package net.lag.kestrel
import com.twitter.conversions.time._
import com.twitter.finagle.ClientConnection
import com.twitter.logging.Logger
import com.twitter.util.{Duration, Future, Promise, Time, Timer, TimerTask}
import com.twitter.util.{Duration, Future, Promise, Timer, TimerTask}
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
Expand All @@ -32,9 +32,9 @@ import scala.collection.Set
class ThriftFinagledService(val handler: ThriftHandler, val protocolFactory: TProtocolFactory)
extends thrift.Kestrel.FinagledService(handler, protocolFactory) {

override def close(deadline: Time) = {
override def release() {
handler.release()
super.close(deadline)
super.release()
}
}

Expand Down

0 comments on commit ac8a224

Please sign in to comment.