Skip to content

Commit

Permalink
Merge branch 'develop' of github.com:PredictionIO/PredictionIO into d…
Browse files Browse the repository at this point in the history
…evelop
  • Loading branch information
Kenneth Chan committed Jan 12, 2014
2 parents a2cf93b + bda2a31 commit 2f00191
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 59 deletions.
41 changes: 41 additions & 0 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
Contributing to PredictionIO
============================

Thank you for your interest in contributing to PredictionIO!

We are building this software together and strongly encourage contributions
from the community that are within the guidelines set forth below.


Bug Fixes and New Features
--------------------------

As your first step, please visit and read the [Contributor License Agreement]
(http://prediction.io/cla). All contributors are required to sign this
agreement.


Before starting to write code, look for existing [tickets]
(https://predictionio.atlassian.net/browse/PDIO) or [create one]
(https://predictionio.atlassian.net/secure/CreateIssue!default.jspa)
for your bug, issue, or feature request. This helps the community
avoid working on something that might not be of interest or which
has already been addressed.


Pull Requests
-------------

PredictionIO follows the [git-flow]
(http://nvie.com/posts/a-successful-git-branching-model/) model where all
active development goes to the develop branch, and releases go to the master
branch. Pull requests should be made against the develop branch and include
relevant tests, if applicable.


Talk To Us
----------

We love to hear from you. If you want to work on something or have
questions / feedback, please reach out to us at
https://groups.google.com/forum/#!forum/predictionio-dev
23 changes: 21 additions & 2 deletions commons/src/main/scala/io/prediction/commons/Config.scala
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,7 @@ class Config {
}
}

/** Obtains a generic ModelData object with configured backend type. */
def getModeldata(engineinfoid: String): modeldata.ModelData = {
modeldataDbType match {
case "mongodb" => {
Expand Down Expand Up @@ -671,7 +672,25 @@ class Config {
}
}

/** Obtains an ItemRecScores object with configured backend type. */
/** Obtains a generic ModelData object for training with configured backend type. */
def getModeldataTraining(engineinfoid: String): modeldata.ModelData = {
modeldataDbType match {
case "mongodb" => {
val thisObj = this
engineinfoid match {
case "itemrec" => getModeldataTrainingItemRecScores
case "itemsim" => getModeldataTrainingItemSimScores
case _ => new modeldata.mongodb.MongoModelData {
val config = thisObj
val mongodb = modeldataTrainingMongoDb.get
}
}
}
case _ => throw new RuntimeException("Invalid modeldata database type: " + modeldataTrainingDbType)
}
}

/** Obtains an ItemRecScores object for training with configured backend type. */
def getModeldataTrainingItemRecScores(): modeldata.ItemRecScores = {
modeldataDbType match {
case "mongodb" => {
Expand All @@ -681,7 +700,7 @@ class Config {
}
}

/** Obtains an ItemSimScores object with configured backend type. */
/** Obtains an ItemSimScores object for training with configured backend type. */
def getModeldataTrainingItemSimScores(): modeldata.ItemSimScores = {
modeldataDbType match {
case "mongodb" => {
Expand Down
2 changes: 1 addition & 1 deletion dist/conf/init.json
Original file line number Diff line number Diff line change
Expand Up @@ -4111,7 +4111,7 @@
"paramsections": []
},
"pdio-itemsimcf": {
"name": "Item Simiarlity Collaborative Filtering",
"name": "Item Similarity Collaborative Filtering",
"description": "This algorithm predicts similar items which the user may also like.",
"batchcommands": [
"$hadoop$ jar $base$/lib/$pdioItemsimAlgo$ io.prediction.algorithms.scalding.itemsim.itemsimcf.DataPreparator --hdfs --dbType $appdataDbType$ --dbName $appdataDbName$ --dbHost $appdataDbHost$ --dbPort $appdataDbPort$ --hdfsRoot $hdfsRoot$ --appid $appid$ --engineid $engineid$ --algoid $algoid$ $itypes$ --viewParam $viewParam$ --likeParam $likeParam$ --dislikeParam $dislikeParam$ --conversionParam $conversionParam$ --conflictParam $conflictParam$",
Expand Down
89 changes: 33 additions & 56 deletions servers/scheduler/app/controllers/Jobs.scala
Original file line number Diff line number Diff line change
Expand Up @@ -196,8 +196,6 @@ class AlgoJob extends InterruptableJob {
val engines = Scheduler.engines
val algos = Scheduler.algos
val algoInfos = Scheduler.algoInfos
val itemRecScores = Scheduler.itemRecScores
val itemSimScores = Scheduler.itemSimScores
val logPrefix = s"Algo ID ${algoid}: "

algos.get(algoid) map { algo =>
Expand Down Expand Up @@ -286,23 +284,20 @@ class OfflineEvalJob extends InterruptableJob {

Some(steptype) collect {
case "split" => {
/** Delete old model data, if any (for recovering from an incomplete run, and clean old score for multi-iterations) */
Scheduler.offlineEvals.get(evalid) map { offlineEval =>
Scheduler.engines.get(offlineEval.engineid) map { engine =>
val algosToRun = Scheduler.algos.getByOfflineEvalid(offlineEval.id).toSeq
engine.infoid match {
case "itemrec" => algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting any old model data")
Scheduler.trainingItemRecScores.deleteByAlgoid(algo.id)
}
case "itemsim" => algosToRun foreach { algo =>
if (iteration == 1) {
/** Delete old model data, if any (for recovering from an incomplete run, and clean old score for multi-iterations) */
Scheduler.offlineEvals.get(evalid) map { offlineEval =>
Scheduler.engines.get(offlineEval.engineid) map { engine =>
val algosToRun = Scheduler.algos.getByOfflineEvalid(offlineEval.id).toSeq
val modelData = Scheduler.config.getModeldataTraining(engine.infoid)
algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting any old model data")
Scheduler.trainingItemSimScores.deleteByAlgoid(algo.id)
modelData.delete(algo.id, false)
}
Logger.info(s"${logPrefix}Deleting any old user-to-item actions")
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
}
Logger.info(s"${logPrefix}Deleting any old user-to-item actions")
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
}
}
if (iteration > 1) {
Expand Down Expand Up @@ -483,15 +478,10 @@ class OfflineEvalJob extends InterruptableJob {
}

/** Clean up */
engine.infoid match {
case "itemrec" => algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting used model data")
Scheduler.trainingItemRecScores.deleteByAlgoid(algo.id)
}
case "itemsim" => algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting used model data")
Scheduler.trainingItemSimScores.deleteByAlgoid(algo.id)
}
val modelData = config.getModeldataTraining(engine.infoid)
algosToRun foreach { algo =>
Logger.info(s"${logPrefix}Algo ID ${algo.id}: Deleting used model data")
modelData.delete(algo.id, false)
}
Logger.info(s"${logPrefix}Deleting used app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Expand Down Expand Up @@ -666,6 +656,7 @@ class OfflineTuneJob extends InterruptableJob {

offlineTunes.get(tuneid) map { offlineTune =>
engines.get(offlineTune.engineid) map { engine =>
val modelData = config.getModeldataTraining(engine.infoid)
apps.get(engine.appid) map { app =>
val totalLoops = offlineTune.loops
val offlineEvalsToRun = offlineEvals.getByTuneid(offlineTune.id).toSeq
Expand All @@ -686,29 +677,21 @@ class OfflineTuneJob extends InterruptableJob {

/** Delete old model data, if any (usually recovering from an incomplete run) */
val algosToClean = algos.getByOfflineEvalid(offlineEval.id).toSeq.filter(_.loop.map(_ != 0).getOrElse(false))
engine.infoid match {
case "itemrec" => algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting any old model data")
Scheduler.trainingItemRecScores.deleteByAlgoid(algo.id)
algos.delete(algo.id)
}
case "itemsim" =>
algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting any old model data")
Scheduler.trainingItemSimScores.deleteByAlgoid(algo.id)
algos.delete(algo.id)
}
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Deleting any old app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTestItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationItems.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationU2IActions.deleteByAppid(offlineEval.id)
algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting any old model data")
modelData.delete(algo.id, false)
algos.delete(algo.id)
}
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Deleting any old app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTrainingU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataTestUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataTestItems.deleteByAppid(offlineEval.id)
Scheduler.appdataTestU2IActions.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationUsers.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationItems.deleteByAppid(offlineEval.id)
Scheduler.appdataValidationU2IActions.deleteByAppid(offlineEval.id)

val currentIteration = 0
val iterationParam = collection.immutable.Map("iteration" -> currentIteration)
Expand Down Expand Up @@ -937,15 +920,9 @@ class OfflineTuneJob extends InterruptableJob {
/** Clean up */
offlineEvalsToRun foreach { offlineEval =>
val algosToClean = algos.getByOfflineEvalid(offlineEval.id).toSeq.filter(_.loop.map(_ != 0).getOrElse(false))
engine.infoid match {
case "itemrec" => algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting used model data")
Scheduler.trainingItemRecScores.deleteByAlgoid(algo.id)
}
case "itemsim" => algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting used model data")
Scheduler.trainingItemSimScores.deleteByAlgoid(algo.id)
}
algosToClean foreach { algo =>
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Algo ID ${algo.id}: Deleting used model data")
modelData.delete(algo.id, false)
}
Logger.info(s"${logPrefix}OfflineEval ID ${offlineEval.id}: Deleting used app data")
Scheduler.appdataTrainingUsers.deleteByAppid(offlineEval.id)
Expand Down

0 comments on commit 2f00191

Please sign in to comment.