Skip to content

Commit

Permalink
Use pack for U2I training-test split wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
dszeto committed Mar 24, 2014
1 parent efb68b9 commit 74f353c
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 143 deletions.
6 changes: 3 additions & 3 deletions bin/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,6 @@ if test "$SKIP_PROCESS" = "1" ; then
else
echo "+ Assemble Process Hadoop Scalding"
BASE_TARGETS="$BASE_TARGETS processHadoopScalding/assembly"

echo "+ Assemble Process Commons Evaluations Scala U2I Training-Test Splitter"
BASE_TARGETS="$BASE_TARGETS processEnginesCommonsEvalScalaU2ITrainingTestSplit/assembly"
fi

# Build Non-distributed Random Algorithm
Expand Down Expand Up @@ -84,6 +81,9 @@ BASE_TARGETS="$BASE_TARGETS processEnginesCommonsEvalScalaParamGen/pack"
echo "+ Pack Single Machine Top-K Collector"
BASE_TARGETS="$BASE_TARGETS processEnginesCommonsEvalScalaTopKItems/pack"

echo "+ Pack Process Commons Evaluations Scala U2I Training-Test Splitter Wrapper"
BASE_TARGETS="$BASE_TARGETS processEnginesCommonsEvalScalaU2ITrainingTestSplit/pack"

# Build connection check tool
echo "+ Pack Connection Check Tool"
BASE_TARGETS="$BASE_TARGETS toolsConncheck/pack"
Expand Down
4 changes: 3 additions & 1 deletion bin/package.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ cp $BASE/bin/quiet.sh $PACKAGE_DIR/bin
cp -R $DIST_DIR/conf $PACKAGE_DIR

cp "$BASE/process/target/scala-2.10/predictionio-process-hadoop-scalding-assembly-$VERSION.jar" "$PACKAGE_DIR/lib"
cp "$BASE/process/engines/commons/evaluations/scala/u2itrainingtestsplit/target/scala-2.10/predictionio-process-commons-evaluations-scala-u2itrainingtestsplittime-assembly-$VERSION.jar" "$PACKAGE_DIR/lib"

cp $BASE/process/engines/commons/algorithms/scala/random/target/pack/bin/* $PACKAGE_DIR/bin
cp -n $BASE/process/engines/commons/algorithms/scala/random/target/pack/lib/* $PACKAGE_DIR/lib
Expand All @@ -69,6 +68,9 @@ cp -n $BASE/process/engines/commons/evaluations/scala/topkitems/target/pack/lib/
cp $BASE/process/engines/commons/evaluations/scala/u2isplit/target/pack/bin/* $PACKAGE_DIR/bin
cp -n $BASE/process/engines/commons/evaluations/scala/u2isplit/target/pack/lib/* $PACKAGE_DIR/lib

cp $BASE/process/engines/commons/evaluations/scala/u2itrainingtestsplit/target/pack/bin/* $PACKAGE_DIR/bin
cp -n $BASE/process/engines/commons/evaluations/scala/u2itrainingtestsplit/target/pack/lib/* $PACKAGE_DIR/lib

cp $BASE/process/engines/itemrec/algorithms/scala/generic/target/pack/bin/* $PACKAGE_DIR/bin
cp -n $BASE/process/engines/itemrec/algorithms/scala/generic/target/pack/lib/* $PACKAGE_DIR/lib

Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ lazy val processEnginesCommonsEvalScalaU2ISplit = project
lazy val processEnginesCommonsEvalScalaU2ITrainingTestSplit = project
.in(file("process/engines/commons/evaluations/scala/u2itrainingtestsplit"))
.dependsOn(commons)
.settings(scalariformSettings: _*)

lazy val processEnginesItemRecAlgoHadoopScalding = project
.in(file("process/engines/itemrec/algorithms/hadoop/scalding"))
Expand Down
5 changes: 1 addition & 4 deletions dist/conf/init.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@
},
"jars.pdioCommonsEval": {
"value": "predictionio-process-hadoop-scalding-assembly-0.7.0-SNAPSHOT.jar"
},
"jars.pdioCommonsU2ITrainingTestSplit": {
"value": "predictionio-process-commons-evaluations-scala-u2itrainingtestsplittime-assembly-0.7.0-SNAPSHOT.jar"
}
},
"engineinfos": {
Expand Down Expand Up @@ -7536,7 +7533,7 @@
"itemrec", "itemsim"
],
"commands": [
"java -jar $base$/lib/$pdioCommonsU2ITrainingTestSplit$ --hadoop $hadoop$ --pdioEvalJar $base$/lib/$pdioCommonsEval$ --sequenceNum $iteration$ --hdfs --dbType $appdataDbType$ --dbName $appdataDbName$ --dbHost $appdataDbHost$ --dbPort $appdataDbPort$ --training_dbType $appdataTrainingDbType$ --training_dbName $appdataTrainingDbName$ --training_dbHost $appdataTrainingDbHost$ --training_dbPort $appdataTrainingDbPort$ --validation_dbType $appdataValidationDbType$ --validation_dbName $appdataValidationDbName$ --validation_dbHost $appdataValidationDbHost$ --validation_dbPort $appdataValidationDbPort$ --test_dbType $appdataTestDbType$ --test_dbName $appdataTestDbName$ --test_dbHost $appdataTestDbHost$ --test_dbPort $appdataTestDbPort$ --hdfsRoot $hdfsRoot$ --localTempRoot $localTempRoot$ --appid $appid$ --engineid $engineid$ --evalid $evalid$ $itypes$ --trainingPercent $trainingPercent$ --validationPercent $validationPercent$ --testPercent $testPercent$ --timeorder $timeorder$"
"$base$/bin/u2itrainingtestsplit --hadoop $hadoop$ --pdioEvalJar $base$/lib/$pdioCommonsEval$ --sequenceNum $iteration$ --hdfs --dbType $appdataDbType$ --dbName $appdataDbName$ --dbHost $appdataDbHost$ --dbPort $appdataDbPort$ --training_dbType $appdataTrainingDbType$ --training_dbName $appdataTrainingDbName$ --training_dbHost $appdataTrainingDbHost$ --training_dbPort $appdataTrainingDbPort$ --validation_dbType $appdataValidationDbType$ --validation_dbName $appdataValidationDbName$ --validation_dbHost $appdataValidationDbHost$ --validation_dbPort $appdataValidationDbPort$ --test_dbType $appdataTestDbType$ --test_dbName $appdataTestDbName$ --test_dbHost $appdataTestDbHost$ --test_dbPort $appdataTestDbPort$ --hdfsRoot $hdfsRoot$ --localTempRoot $localTempRoot$ --appid $appid$ --engineid $engineid$ --evalid $evalid$ $itypes$ --trainingPercent $trainingPercent$ --validationPercent $validationPercent$ --testPercent $testPercent$ --timeorder $timeorder$"
],
"params": {
"timeorder": {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,21 @@
import AssemblyKeys._

assemblySettings
import xerial.sbt.Pack._

name := "predictionio-process-commons-evaluations-scala-u2itrainingtestsplittime"

libraryDependencies += "com.twitter" %% "scalding-args" % "0.8.6"
libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % "1.1.1",
"ch.qos.logback" % "logback-core" % "1.1.1",
"com.github.scopt" %% "scopt" % "3.2.0",
"org.clapper" %% "grizzled-slf4j" % "1.0.1")

packSettings

packJarNameConvention := "full"

packExpandedClasspath := true

packGenerateWindowsBatFile := false

packMain := Map("u2itrainingtestsplit" -> "io.prediction.evaluations.commons.trainingtestsplit.U2ITrainingTestSplitTime")

excludedJars in assembly <<= (fullClasspath in assembly) map { cp =>
val excludes = Set("minlog-1.2.jar")
cp filter { jar => excludes(jar.data.getName)}
}
packJvmOpts := Map("u2itrainingtestsplit" -> Common.packCommonJvmOpts)
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package io.prediction.evaluations.commons.trainingtestsplit

import io.prediction.commons.filepath.U2ITrainingTestSplitFile

import java.io.File
import scala.io.Source
import scala.sys.process._

import grizzled.slf4j.Logger

case class U2ITrainingTestSplitTimeConfig(
hadoop: String = "",
pdioEvalJar: String = "",
hdfsRoot: String = "",
localTempRoot: String = "",
appid: Int = 0,
engineid: Int = 0,
evalid: Int = 0,
sequenceNum: Int = 0)

/**
* Wrapper for Scalding U2ITrainingTestSplitTime job
*
* Args:
* --hadoop <string> hadoop command
* --pdioEvalJar <string> the name of the Scalding U2ITrainingTestSplit job jar
* --sequenceNum. <int>. the sequence number (starts from 1 for the 1st iteration and then increment for later iterations)
*
* --dbType: <string> appdata DB type
* --dbName: <string>
* --dbHost: <string>. optional. (eg. "127.0.0.1")
* --dbPort: <int>. optional. (eg. 27017)
*
* --training_dbType: <string> training_appadta DB type
* --training_dbName: <string>
* --training_dbHost: <string>. optional
* --training_dbPort: <int>. optional
*
* --validation_dbType: <string> validation_appdata DB type
* --validation_dbName: <string>
* --validation_dbHost: <string>. optional
* --validation_dbPort: <int>. optional
*
* --test_dbType: <string> test_appdata DB type
* --test_dbName: <string>
* --test_dbHost: <string>. optional
* --test_dbPort: <int>. optional
*
* --hdfsRoot: <string>. Root directory of the HDFS
*
* --appid: <int>
* --engineid: <int>
* --evalid: <int>
*
* --itypes: <string separated by white space>. eg "--itypes type1 type2". If no --itypes specified, then ALL itypes will be used.
*
* --trainingPercent: <double> (0.01 to 1). training set percentage
* --validationPercent: <dboule> (0.01 to 1). validation set percentage
* --testPercent: <double> (0.01 to 1). test set percentage
* --timeorder: <boolean>. Require total percentage < 1
*
*/
object U2ITrainingTestSplitTime {
def main(args: Array[String]) {
val parser = new scopt.OptionParser[U2ITrainingTestSplitTimeConfig]("u2itrainingtestsplit") {
head("u2itrainingtestsplit")
opt[String]("hadoop") required () action { (x, c) =>
c.copy(hadoop = x)
} text ("path to the 'hadoop' command")
opt[String]("pdioEvalJar") required () action { (x, c) =>
c.copy(pdioEvalJar = x)
} text ("path to PredictionIO Hadoop job JAR")
opt[String]("hdfsRoot") required () action { (x, c) =>
c.copy(hdfsRoot = x)
} text ("PredictionIO root path in HDFS")
opt[String]("localTempRoot") required () action { (x, c) =>
c.copy(localTempRoot = x)
} text ("local directory for temporary storage")
opt[Int]("appid") required () action { (x, c) =>
c.copy(appid = x)
} text ("the App ID of this offline evaluation")
opt[Int]("engineid") required () action { (x, c) =>
c.copy(engineid = x)
} text ("the Engine ID of this offline evaluation")
opt[Int]("evalid") required () action { (x, c) =>
c.copy(evalid = x)
} text ("the OfflineEval ID of this offline evaluation")
opt[Int]("sequenceNum") required () action { (x, c) =>
c.copy(sequenceNum = x)
} validate { x =>
if (x >= 1) success else failure("--sequenceNum must be >= 1")
} text ("sequence (iteration) number of the offline evaluation")
}
val logger = Logger(U2ITrainingTestSplitTime.getClass)

parser.parse(args, U2ITrainingTestSplitTimeConfig()) map { config =>
val hadoop = config.hadoop
val pdioEvalJar = config.pdioEvalJar
val hdfsRoot = config.hdfsRoot
val localTempRoot = config.localTempRoot
val appid = config.appid
val engineid = config.engineid
val evalid = config.evalid
val sequenceNum = config.sequenceNum
val argsString = args.toString
val resplit = sequenceNum > 1

/** command */
if (!resplit) {
// prep
val splitPrepCmd = hadoop + " jar " + pdioEvalJar + " io.prediction.evaluations.scalding.commons.u2itrainingtestsplit.U2ITrainingTestSplitTimePrep " + argsString
executeCommandAndCheck(splitPrepCmd)
}

// copy the count to local tmp
val hdfsCountPath = U2ITrainingTestSplitFile(hdfsRoot, appid, engineid, evalid, "u2iCount.tsv")
val localCountPath = localTempRoot + "eval-" + evalid + "-u2iCount.tsv"
val localCountFile = new File(localCountPath)

// create parent dir
localCountFile.getParentFile().mkdirs()

// delete existing file first
if (localCountFile.exists()) localCountFile.delete()

// get the count from hdfs
val getHdfsCountCmd = hadoop + " fs -getmerge " + hdfsCountPath + " " + localCountPath
executeCommandAndCheck(getHdfsCountCmd)

// read the local file and get the count
val lines = Source.fromFile(localCountPath).getLines
if (lines.isEmpty) throw new RuntimeException(s"Count file $localCountPath is empty")

val count = lines.next

// split
val splitCmd = hadoop + " jar " + pdioEvalJar + " io.prediction.evaluations.scalding.commons.u2itrainingtestsplit.U2ITrainingTestSplitTime " + argsString + " --totalCount " + count
executeCommandAndCheck(splitCmd)

// delete local tmp file
logger.info(s"Deleting temporary file $localCountPath...")
localCountFile.delete()
}

def executeCommandAndCheck(cmd: String) = {
logger.info(s"Executing $cmd...")
if ((cmd.!) != 0) throw new RuntimeException(s"Failed to execute '$cmd'")
}
}
}

This file was deleted.

0 comments on commit 74f353c

Please sign in to comment.