Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Tests the replay fixes for remove tentative
Browse files Browse the repository at this point in the history
Added new test that verifies the replay fixes when remove tentative/confirm remove is used in place of unconditional removes. This also exercises the open transactions logic in the checkpoint creation and packing of journals.

RB_ID=132406
  • Loading branch information
dhamanka committed Mar 17, 2013
1 parent ea5fee3 commit 453b755
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 21 deletions.
8 changes: 4 additions & 4 deletions src/main/scala/net/lag/kestrel/Journal.scala
Original file line number Diff line number Diff line change
Expand Up @@ -510,21 +510,21 @@ class Journal(queuePath: File, queueName: String, syncScheduler: ScheduledExecut
checkpoint = None
}

def archivedJournalFiles(): List[String] = {
private def archivedJournalFiles(): List[String] = {
this.synchronized {
Journal.archivedFilesForQueue(queuePath, queueName)
}
}

def allJournalFiles(): List[String] = {
private def allJournalFiles(): List[String] = {
archivedJournalFiles() ++ List(queueName)
}

def journalFilesBefore(filename: String): Seq[String] = {
private def journalFilesBefore(filename: String): Seq[String] = {
allJournalFiles().takeWhile { _ != filename }
}

def journalFilesAfter(filename: String): Option[String] = {
private def journalFilesAfter(filename: String): Option[String] = {
allJournalFiles().dropWhile { _ != filename }.drop(1).headOption
}
}
Expand Down
74 changes: 57 additions & 17 deletions src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package net.lag.kestrel
import java.io.{File, FileInputStream}
import java.util.concurrent.{CountDownLatch, ScheduledThreadPoolExecutor}
import scala.collection.mutable
import scala.util.Random
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
import com.twitter.ostrich.stats.Stats
Expand Down Expand Up @@ -48,6 +49,14 @@ class PersistentQueueSpec extends Specification
}
}

def verifyQLengthAndDumpJournal (q: PersistentQueue, queueName: String, expectedLength: Int) {
if (q.length != expectedLength) {
// If the queue length doesn't meet the expectations, dump the journal so
// that we can debug the cause of the incorrect length
(q.length, dumpJournal(queueName)) mustEqual (expectedLength, "")
}
}

doBefore {
timer.timerTask.cancelled = false
}
Expand Down Expand Up @@ -154,7 +163,7 @@ class PersistentQueueSpec extends Specification
maxJournalSize = 256.bytes
maxMemorySize = 64.bytes
}.apply()
val queueName = "rotate-pack-journal" + Time.now
val queueName = "rotate-pack-journal" + Random.nextInt
val q = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q.setup()

Expand All @@ -173,11 +182,7 @@ class PersistentQueueSpec extends Specification

val q2 = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q2.setup()
if (q2.length != 16) {
// If the queue length doesn't meet the expectations, dump the journal so
// that we can debug the cause of the incorrect length
dumpJournal(queueName) mustEqual ""
}
verifyQLengthAndDumpJournal(q2, queueName, 16)
}
}
}
Expand All @@ -190,7 +195,7 @@ class PersistentQueueSpec extends Specification
maxJournalSize = 256.bytes
maxMemorySize = 64.bytes
}.apply()
val queueName = "rotate-pack-journal-recover" + Time.now
val queueName = "rotate-pack-journal-recover" + Random.nextInt
val q = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q.setup()

Expand All @@ -209,11 +214,7 @@ class PersistentQueueSpec extends Specification

val q2 = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q2.setup()
if (q2.length != 16) {
// If the queue length doesn't meet the expectations, dump the journal so
// that we can debug the cause of the incorrect length
dumpJournal(queueName) mustEqual ""
}
verifyQLengthAndDumpJournal(q2, queueName, 16)

(1 to 31).foreach { _ =>
q2.remove
Expand All @@ -224,15 +225,54 @@ class PersistentQueueSpec extends Specification

val q3 = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q3.setup()
if (q3.length != 16) {
// If the queue length doesn't meet the expectations, dump the journal so
// that we can debug the cause of the incorrect length
dumpJournal(queueName) mustEqual ""
}
verifyQLengthAndDumpJournal(q3, queueName, 16)
}
}
}

"rotate pack journals with remove tentative and then recover" in {
withTempFolder {
withJournalPacker {
val config = new QueueBuilder {
defaultJournalSize = 16.bytes
maxJournalSize = 256.bytes
maxMemorySize = 64.bytes
}.apply()
val queueName = "rotate-pack-journal-recover-tentative" + Random.nextInt
val q = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q.setup()

// Set the checkpoint
(1 to 16).foreach { _ =>
q.add(new Array[Byte](32))
}
q.length mustEqual 16

(1 to 10).foreach { _ =>
val item = q.remove(true).get
q.confirmRemove(item.xid)
q.add(new Array[Byte](32))
}
q.length mustEqual 16
q.close()

val q2 = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q2.setup()
verifyQLengthAndDumpJournal(q2, queueName, 16)

(1 to 31).foreach { _ =>
q2.remove
q2.add(new Array[Byte](32))
}
q2.length mustEqual 16
q2.close()

val q3 = new PersistentQueue(queueName, folderName, config, timer, scheduler)
q3.setup()
verifyQLengthAndDumpJournal(q3, queueName, 16)
}
}
}

"rewrite journals when they exceed the defaultJournalSize and are empty" in {
withTempFolder {
Expand Down

0 comments on commit 453b755

Please sign in to comment.