Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Kestrel replay fix
Browse files Browse the repository at this point in the history
Kestrel replay fix - Part 1:

Problem: The pack request is triggered by the fillReadBehind where as the journal.remove updates removesSinceReadBehind after the fillReadBehind call. This ordering leaves two windows in which either we will lose an item or deliver an extra (potentially dummy/invalid item).

This change addresses the problem as follows
1. In the normal mode - fillReadBehind is called after the remove has already been logged in the journal - this makes sure that the removesSinceReadBehind count has already been updated to reflect the fact that the current item has been removed from the in-memory queue which is used as the source of the pack. The same is done for the discard/expiration workflows
2. In the replay mode - Since journal removes are already logged (or are not being newly logged) the fillReadBehind is done as before.
3. The pack logic calls a fsync on the journal file before the packed file replaces the old journal files on disk to ensure that the removes that were accounted for in the pack have been indeed logged in the journal.
4. The change to PersistentQueue#setup is just for the tests

This doesn't fully address KEST-402. However this is necessary for the second part of the fix.

RB_ID=121133
  • Loading branch information
dhamanka committed Feb 28, 2013
1 parent 96b6719 commit 606786f
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 9 deletions.
6 changes: 5 additions & 1 deletion src/main/scala/net/lag/kestrel/Journal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
}

def replayFile(name: String, filename: String)(f: JournalItem => Unit): Unit = {
log.debug("Replaying '%s' file %s", name, filename)
log.info("Replaying '%s' file %s", name, filename)
size = 0
var lastUpdate = 0L
try {
Expand Down Expand Up @@ -492,6 +492,10 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
newJournal.dump(state.checkpoint.reservedItems, state.openItems, state.pentUpDeletes, state.queueState)
newJournal.close()

// Flush the updates to the current journal so that any removes that have been accounted
// for in the pack are persisted before the packed file replaces existing files
writer.fsync()

log.info("Packing '%s' -- erasing old files.", queueName)
val packFile = new File(queuePath, state.checkpoint.filename + ".pack")
tempFile.renameTo(packFile)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/net/lag/kestrel/PeriodicSyncFile.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class PeriodicSyncFile(file: File, scheduler: ScheduledExecutorService, period:

@volatile var closed = false

private def fsync() {
def fsync() {
synchronized {
// race: we could underestimate the number of completed writes. that's okay.
val completed = promises.size
Expand Down
9 changes: 7 additions & 2 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
_remove(false, None)
totalDiscarded.incr()
if (config.keepJournal) journal.remove()
fillReadBehind()
}

val item = QItem(addTime, adjustExpiry(Time.now, expiry), value, 0)
Expand Down Expand Up @@ -380,6 +381,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
val item = _remove(transaction, None)
if (config.keepJournal && item.isDefined) {
if (transaction) journal.removeTentative(item.get.xid) else journal.remove()
fillReadBehind()
checkRotateJournal()
}

Expand Down Expand Up @@ -523,6 +525,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def setup() {
synchronized {
queueSize = 0
queueLength = 0
replayJournal()
}
}
Expand Down Expand Up @@ -655,7 +658,9 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
queueSize -= len
_memoryBytes -= len
queueLength -= 1
fillReadBehind()
if (journal.isReplaying) {
fillReadBehind()
}
_currentAge = item.addTime
if (transaction) {
item.xid = xid.getOrElse { nextXid() }
Expand All @@ -681,8 +686,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
queueSize -= len
_memoryBytes -= len
queueLength -= 1
fillReadBehind()
if (config.keepJournal) journal.remove()
fillReadBehind()
toRemove += item
} else {
continue = false
Expand Down
20 changes: 15 additions & 5 deletions src/test/scala/net/lag/kestrel/DumpJournal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@ package net.lag.kestrel
import java.io.File
import scala.collection.mutable
import com.twitter.util.TempFolder
import com.twitter.util.Time

trait DumpJournal { self: TempFolder =>
def dumpJournal(qname: String): String = {
def dumpJournal(qname: String, dumpTimestamps: Boolean = false): String = {
var rv = new mutable.ListBuffer[JournalItem]
new Journal(new File(folderName, qname).getCanonicalPath).replay { item => rv += item }
rv map {
case JournalItem.Add(item) =>
if (item.data.size > 0 && item.data(0) > 0) {
"add(%d:%d:%s)".format(item.data.size, item.xid, new String(item.data))
} else {
"add(%d:%d)".format(item.data.size, item.xid)
if (!dumpTimestamps) {
if (item.data.size > 0 && item.data(0) > 0) {
"add(%d:%d:%s)".format(item.data.size, item.xid, new String(item.data))
} else {
"add(%d:%d)".format(item.data.size, item.xid)
}
}
else {
if (item.data.size > 0 && item.data(0) > 0) {
"add(%d:%d:%s:%d:%d)".format(item.data.size, item.xid, new String(item.data), item.addTime.inMilliseconds, item.expiry.getOrElse(Time.fromMilliseconds(0)).inMilliseconds)
} else {
"add(%d:%d:%d:%d)".format(item.data.size, item.xid, item.addTime.inMilliseconds, item.expiry.getOrElse(Time.fromMilliseconds(0)).inMilliseconds)
}
}
case JournalItem.Remove => "remove"
case JournalItem.RemoveTentative(xid) => "remove-tentative(%d)".format(xid)
Expand Down
46 changes: 46 additions & 0 deletions src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ class PersistentQueueSpec extends Specification
val timer = new FakeTimer()
val scheduler = new ScheduledThreadPoolExecutor(1)

def withJournalPacker(f: => Unit) {
Journal.packer.start()
try {
f
} finally {
Journal.packer.shutdown()
}
}

doBefore {
timer.timerTask.cancelled = false
}
Expand Down Expand Up @@ -137,6 +146,43 @@ class PersistentQueueSpec extends Specification
}
}

"rotate and pack journals" in {
withTempFolder {
withJournalPacker {
val config = new QueueBuilder {
defaultJournalSize = 16.bytes
maxJournalSize = 256.bytes
maxMemorySize = 64.bytes
}.apply()
val queueName = "rotate-pack-journal" + Time.now
val q = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q.setup()

// Set the checkpoint
(1 to 16).foreach { _ =>
q.add(new Array[Byte](32))
}
q.length mustEqual 16

(1 to 15).foreach { _ =>
q.remove
q.add(new Array[Byte](32))
}
q.length mustEqual 16
q.close()

val q2 = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q2.setup()
if (q2.length != 16) {
// If the queue length doesn't meet the expectations, dump the journal so
// that we can debug the cause of the incorrect length
dumpJournal(queueName) mustEqual ""
}
}
}
}


"rewrite journals when they exceed the defaultJournalSize and are empty" in {
withTempFolder {
val config = new QueueBuilder {
Expand Down

0 comments on commit 606786f

Please sign in to comment.