Skip to content

Commit

Permalink
Use journaled write concerns in MongoDB to guarantee successful writes
Browse files Browse the repository at this point in the history
  • Loading branch information
dszeto committed Mar 28, 2014
1 parent 305c2a9 commit aa0283e
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package io.prediction.commons.settings.mongodb

import io.prediction.commons.settings.{ App, Apps }

import com.mongodb.casbah.Imports._
import com.mongodb.casbah.WriteConcern

/** MongoDB implementation of Apps. */
class MongoApps(db: MongoDB) extends Apps {
Expand All @@ -10,6 +12,8 @@ class MongoApps(db: MongoDB) extends Apps {
private val seq = new MongoSequences(db)
private val getFields = MongoDBObject("userid" -> 1, "appkey" -> 1, "display" -> 1, "url" -> 1, "cat" -> 1, "desc" -> 1, "timezone" -> 1)

appColl.setWriteConcern(WriteConcern.JournalSafe)

private def dbObjToApp(dbObj: DBObject) = {
App(
id = dbObj.as[Int]("_id"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,18 @@ package io.prediction.commons.settings.mongodb

import io.prediction.commons.MongoUtils
import io.prediction.commons.settings.{ User, Users }

import com.mongodb.casbah.Imports._
import com.mongodb.casbah.WriteConcern

/** MongoDB implementation of Users. */
class MongoUsers(db: MongoDB) extends Users {
private val emptyObj = MongoDBObject()
private val userColl = db("users")
private val seq = new MongoSequences(db)

userColl.setWriteConcern(WriteConcern.JournalSafe)

def authenticate(id: Int, password: String) = {
userColl.findOne(MongoDBObject("_id" -> id, "password" -> password) ++ ("confirm" $exists false)) map { _ => true } getOrElse false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class AppsSpec extends Specification {
val userid = 2345
val dummy = dummyApp(0, userid, "getByAppkey")
val id = apps.insert(dummy)
apps.getByAppkey("getByAppkey") must beSome(dummy.copy(id = id)).eventually(60, 1000.millis)
apps.getByAppkey("getByAppkey") must beSome(dummy.copy(id = id))
}

def getByAppkeyNonExist(apps: Apps) = {
Expand All @@ -83,7 +83,7 @@ class AppsSpec extends Specification {
val userid = 689
val dummy = dummyApp(0, userid, name)
val id = apps.insert(dummy)
apps.getByAppkeyAndUserid(name, userid) must beSome(dummy.copy(id = id)).eventually(60, 1000.millis)
apps.getByAppkeyAndUserid(name, userid) must beSome(dummy.copy(id = id))
}

def getByAppkeyAndUseridNonExist(apps: Apps) = {
Expand All @@ -97,7 +97,7 @@ class AppsSpec extends Specification {
val userid = 12
val dummy = dummyApp(0, userid, name)
val id = apps.insert(dummy)
apps.getByIdAndUserid(id, userid) must beSome(dummy.copy(id = id)).eventually(60, 1000.millis)
apps.getByIdAndUserid(id, userid) must beSome(dummy.copy(id = id))
}

def getByIdAndUseridNonExist(apps: Apps) = {
Expand All @@ -112,14 +112,14 @@ class AppsSpec extends Specification {
val userid = 34
val id = apps.insert(dummyApp(0, userid, name))
apps.deleteByIdAndUserid(id, userid)
apps.getByIdAndUserid(id, userid) must beNone.eventually(60, 1000.millis)
apps.getByIdAndUserid(id, userid) must beNone
}

def existsByIdAndAppkeyAndUserid(apps: Apps) = {
val name = "existsByIdAndAppkeyAndUserid"
val userid = 45
val id = apps.insert(dummyApp(0, userid, name))
apps.existsByIdAndAppkeyAndUserid(id, name, userid) must beTrue.eventually(60, 1000.millis)
apps.existsByIdAndAppkeyAndUserid(id, name, userid) must beTrue
}

def update(apps: Apps) = {
Expand All @@ -128,7 +128,7 @@ class AppsSpec extends Specification {
val id = apps.insert(dummyApp(0, userid, name))
val updated = dummyApp(id, 67, "updated")
apps.update(updated)
apps.getByIdAndUserid(id, 67) must beSome(updated).eventually(60, 1000.millis)
apps.getByIdAndUserid(id, 67) must beSome(updated)
}

def updateAppkeyByAppkeyAndUserid(apps: Apps) = {
Expand All @@ -137,7 +137,7 @@ class AppsSpec extends Specification {
val id = apps.insert(dummyApp(0, userid, name))
val updated = dummyApp(id, 67, "updated")
apps.updateAppkeyByAppkeyAndUserid(name, userid, "updatedAppkey")
apps.existsByIdAndAppkeyAndUserid(id, "updatedAppkey", userid) must beTrue.eventually(60, 1000.millis)
apps.existsByIdAndAppkeyAndUserid(id, "updatedAppkey", userid) must beTrue
}

def updateTimezoneByAppkeyAndUserid(apps: Apps) = {
Expand All @@ -155,7 +155,7 @@ class AppsSpec extends Specification {
timezone = "US/Pacific"
)
apps.updateTimezoneByAppkeyAndUserid(name, userid, "US/Pacific")
apps.getByAppkey(name) must beSome(updated).eventually(60, 1000.millis)
apps.getByAppkey(name) must beSome(updated)
}

def backuprestore(apps: Apps) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class UsersSpec extends Specification {
lastname = Option(name),
confirm = name
)
users.emailExists(name + "@prediction.io") must beTrue.eventually(60, 1000.millis)
users.emailExists(name + "@prediction.io") must beTrue
}

def get(users: Users) = {
Expand All @@ -71,7 +71,7 @@ class UsersSpec extends Specification {
lastname = Option(name),
confirm = name
)
users.get(id) must beSome(User(id, name, Option(name), name + "@prediction.io", name, Some(name))).eventually(60, 1000.millis)
users.get(id) must beSome(User(id, name, Option(name), name + "@prediction.io", name, Some(name)))
}

def getByEmail(users: Users) = {
Expand All @@ -83,7 +83,7 @@ class UsersSpec extends Specification {
lastname = None,
confirm = name
)
users.getByEmail(name + "@prediction.io") must beSome(User(id, name, None, name + "@prediction.io", name, Some(name))).eventually(60, 1000.millis)
users.getByEmail(name + "@prediction.io") must beSome(User(id, name, None, name + "@prediction.io", name, Some(name)))
}

def emailExists(users: Users) = {
Expand All @@ -96,7 +96,7 @@ class UsersSpec extends Specification {
lastname = None,
confirm = dummy
)
users.emailExists(email) must beTrue.eventually(60, 1000.millis)
users.emailExists(email) must beTrue
}

def emailExistsNonExist(users: Users) = {
Expand All @@ -112,7 +112,7 @@ class UsersSpec extends Specification {
lastname = Option(name),
confirm = name
)
users.idAndEmailExists(id, name + "@prediction.io") must beTrue.eventually(60, 1000.millis)
users.idAndEmailExists(id, name + "@prediction.io") must beTrue
}

def idAndEmailExistsNonExist(users: Users) = {
Expand Down Expand Up @@ -145,7 +145,7 @@ class UsersSpec extends Specification {
confirm = name
)
users.confirm(name)
users.authenticate(id, name) must beTrue.eventually(60, 1000.millis)
users.authenticate(id, name) must beTrue
}

def authenticateByEmailNonExist(users: Users) = {
Expand Down Expand Up @@ -174,7 +174,7 @@ class UsersSpec extends Specification {
confirm = name
)
users.confirm(name)
users.authenticateByEmail(name + "@prediction.io", name) must beSome(id).eventually(60, 1000.millis)
users.authenticateByEmail(name + "@prediction.io", name) must beSome(id)
}

def confirmNonExist(users: Users) = {
Expand Down Expand Up @@ -202,7 +202,7 @@ class UsersSpec extends Specification {
confirm = "updateEmail"
)
users.updateEmail(id, "[email protected]")
users.get(id) must beSome(User(id, "updateEmail", None, "[email protected]", "updateEmail", Some("updateEmail"))).eventually(60, 1000.millis)
users.get(id) must beSome(User(id, "updateEmail", None, "[email protected]", "updateEmail", Some("updateEmail")))
}

def updatePassword(users: Users) = {
Expand All @@ -215,7 +215,7 @@ class UsersSpec extends Specification {
)
users.confirm("updatePassword")
users.updatePassword(id, "updatePasswordUpdated")
users.authenticate(id, "updatePasswordUpdated") must beTrue.eventually(60, 1000.millis)
users.authenticate(id, "updatePasswordUpdated") must beTrue
}

def updatePasswordByEmail(users: Users) = {
Expand All @@ -228,7 +228,7 @@ class UsersSpec extends Specification {
)
users.confirm("updatePasswordByEmail")
users.updatePasswordByEmail("[email protected]", "updatePasswordByEmailUpdated")
users.authenticate(id, "updatePasswordByEmailUpdated") must beTrue.eventually(60, 1000.millis)
users.authenticate(id, "updatePasswordByEmailUpdated") must beTrue
}

def update(users: Users) = {
Expand All @@ -241,7 +241,7 @@ class UsersSpec extends Specification {
)
val updatedUser = User(id, "updated", None, "[email protected]")
users.update(updatedUser)
users.get(id) must beSome(updatedUser).eventually(60, 1000.millis)
users.get(id) must beSome(updatedUser)
}

def backuprestore(users: Users) = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,10 @@ class U2ITrainingTestSplitTimeTest extends Specification with TupleConversions {

"all sets are mutually exclusive" in {
// make sure all 3 sinks are flushed
(results("training") must not(containAnyOf(results("validation"))).eventually(10, 1000.millis)) and
(results("training") must not(containAnyOf(results("test"))).eventually(10, 1000.millis)) and
(results("validation") must not(containAnyOf(results("test"))).eventually(10, 1000.millis))
(results.keys.size must be_==(3).eventually(60, 1000.millis)) and
(results("training") must not(containAnyOf(results("validation")))) and
(results("training") must not(containAnyOf(results("test")))) and
(results("validation") must not(containAnyOf(results("test"))))
}

def getTimeOnly(dataSet: List[(String, String, String, String, String)]): List[Long] = {
Expand Down

0 comments on commit aa0283e

Please sign in to comment.