Skip to content

Commit

Permalink
[SPARK-21088][ML] CrossValidator, TrainValidationSplit support collec…
Browse files Browse the repository at this point in the history
…t all models when fitting: Python API

## What changes were proposed in this pull request?

Add python API for collecting sub-models during CrossValidator/TrainValidationSplit fitting.

## How was this patch tested?

UT added.

Author: WeichenXu <[email protected]>

Closes apache#19627 from WeichenXu123/expose-model-list-py.
  • Loading branch information
WeichenXu123 authored and jkbradley committed Apr 16, 2018
1 parent 5003736 commit 0461482
Show file tree
Hide file tree
Showing 7 changed files with 211 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,17 @@ class CrossValidatorModel private[ml] (
this
}

// A Python-friendly auxiliary method
private[tuning] def setSubModels(subModels: JList[JList[Model[_]]])
: CrossValidatorModel = {
_subModels = if (subModels != null) {
Some(subModels.asScala.toArray.map(_.asScala.toArray))
} else {
None
}
this
}

/**
* @return submodels represented in two dimension array. The index of outer array is the
* fold index, and the index of inner array corresponds to the ordering of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,17 @@ class TrainValidationSplitModel private[ml] (
this
}

// A Python-friendly auxiliary method
private[tuning] def setSubModels(subModels: JList[Model[_]])
: TrainValidationSplitModel = {
_subModels = if (subModels != null) {
Some(subModels.asScala.toArray)
} else {
None
}
this
}

/**
* @return submodels represented in array. The index of array corresponds to the ordering of
* estimatorParamMaps
Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/ml/param/_shared_params_code_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,11 @@ def get$Name(self):
"TypeConverters.toInt"),
("parallelism", "the number of threads to use when running parallel algorithms (>= 1).",
"1", "TypeConverters.toInt"),
("collectSubModels", "Param for whether to collect a list of sub-models trained during " +
"tuning. If set to false, then only the single best sub-model will be available after " +
"fitting. If set to true, then all sub-models will be available. Warning: For large " +
"models, collecting all sub-models can cause OOMs on the Spark driver.",
"False", "TypeConverters.toBoolean"),
("loss", "the loss function to be optimized.", None, "TypeConverters.toString")]

code = []
Expand Down
24 changes: 24 additions & 0 deletions python/pyspark/ml/param/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -655,6 +655,30 @@ def getParallelism(self):
return self.getOrDefault(self.parallelism)


class HasCollectSubModels(Params):
"""
Mixin for param collectSubModels: Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.
"""

collectSubModels = Param(Params._dummy(), "collectSubModels", "Param for whether to collect a list of sub-models trained during tuning. If set to false, then only the single best sub-model will be available after fitting. If set to true, then all sub-models will be available. Warning: For large models, collecting all sub-models can cause OOMs on the Spark driver.", typeConverter=TypeConverters.toBoolean)

def __init__(self):
super(HasCollectSubModels, self).__init__()
self._setDefault(collectSubModels=False)

def setCollectSubModels(self, value):
"""
Sets the value of :py:attr:`collectSubModels`.
"""
return self._set(collectSubModels=value)

def getCollectSubModels(self):
"""
Gets the value of collectSubModels or its default value.
"""
return self.getOrDefault(self.collectSubModels)


class HasLoss(Params):
"""
Mixin for param loss: the loss function to be optimized.
Expand Down
78 changes: 78 additions & 0 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,50 @@ def test_parallel_evaluation(self):
cvParallelModel = cv.fit(dataset)
self.assertEqual(cvSerialModel.avgMetrics, cvParallelModel.avgMetrics)

def test_expose_sub_models(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])

lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()

numFolds = 3
cv = CrossValidator(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
numFolds=numFolds, collectSubModels=True)

def checkSubModels(subModels):
self.assertEqual(len(subModels), numFolds)
for i in range(numFolds):
self.assertEqual(len(subModels[i]), len(grid))

cvModel = cv.fit(dataset)
checkSubModels(cvModel.subModels)

# Test the default value for option "persistSubModel" to be "true"
testSubPath = temp_path + "/testCrossValidatorSubModels"
savingPathWithSubModels = testSubPath + "cvModel3"
cvModel.save(savingPathWithSubModels)
cvModel3 = CrossValidatorModel.load(savingPathWithSubModels)
checkSubModels(cvModel3.subModels)
cvModel4 = cvModel3.copy()
checkSubModels(cvModel4.subModels)

savingPathWithoutSubModels = testSubPath + "cvModel2"
cvModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
cvModel2 = CrossValidatorModel.load(savingPathWithoutSubModels)
self.assertEqual(cvModel2.subModels, None)

for i in range(numFolds):
for j in range(len(grid)):
self.assertEqual(cvModel.subModels[i][j].uid, cvModel3.subModels[i][j].uid)

def test_save_load_nested_estimator(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
Expand Down Expand Up @@ -1186,6 +1230,40 @@ def test_parallel_evaluation(self):
tvsParallelModel = tvs.fit(dataset)
self.assertEqual(tvsSerialModel.validationMetrics, tvsParallelModel.validationMetrics)

def test_expose_sub_models(self):
temp_path = tempfile.mkdtemp()
dataset = self.spark.createDataFrame(
[(Vectors.dense([0.0]), 0.0),
(Vectors.dense([0.4]), 1.0),
(Vectors.dense([0.5]), 0.0),
(Vectors.dense([0.6]), 1.0),
(Vectors.dense([1.0]), 1.0)] * 10,
["features", "label"])
lr = LogisticRegression()
grid = ParamGridBuilder().addGrid(lr.maxIter, [0, 1]).build()
evaluator = BinaryClassificationEvaluator()
tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator,
collectSubModels=True)
tvsModel = tvs.fit(dataset)
self.assertEqual(len(tvsModel.subModels), len(grid))

# Test the default value for option "persistSubModel" to be "true"
testSubPath = temp_path + "/testTrainValidationSplitSubModels"
savingPathWithSubModels = testSubPath + "cvModel3"
tvsModel.save(savingPathWithSubModels)
tvsModel3 = TrainValidationSplitModel.load(savingPathWithSubModels)
self.assertEqual(len(tvsModel3.subModels), len(grid))
tvsModel4 = tvsModel3.copy()
self.assertEqual(len(tvsModel4.subModels), len(grid))

savingPathWithoutSubModels = testSubPath + "cvModel2"
tvsModel.write().option("persistSubModels", "false").save(savingPathWithoutSubModels)
tvsModel2 = TrainValidationSplitModel.load(savingPathWithoutSubModels)
self.assertEqual(tvsModel2.subModels, None)

for i in range(len(grid)):
self.assertEqual(tvsModel.subModels[i].uid, tvsModel3.subModels[i].uid)

def test_save_load_nested_estimator(self):
# This tests saving and loading the trained model only.
# Save/load for TrainValidationSplit will be added later: SPARK-13786
Expand Down
Loading

0 comments on commit 0461482

Please sign in to comment.