Skip to content

Commit

Permalink
Fix PEP8 violations in Python mllib.
Browse files Browse the repository at this point in the history
Author: Reynold Xin <[email protected]>

Closes apache#871 from rxin/mllib-pep8 and squashes the following commits:

848416f [Reynold Xin] Fixed a typo in the previous cleanup (c -> sc).
a8db4cd [Reynold Xin] Fix PEP8 violations in Python mllib.
  • Loading branch information
rxin committed May 26, 2014
1 parent 14f0358 commit d33d3c6
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 88 deletions.
42 changes: 21 additions & 21 deletions python/pyspark/mllib/_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@
#
# Sparse double vector format:
#
# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] [nonzeros*8 bytes of values]
# [1-byte 2] [4-byte length] [4-byte nonzeros] [nonzeros*4 bytes of indices] \
# [nonzeros*8 bytes of values]
#
# Double matrix format:
#
Expand Down Expand Up @@ -110,18 +111,18 @@ def _serialize_double_vector(v):
return _serialize_sparse_vector(v)
else:
raise TypeError("_serialize_double_vector called on a %s; "
"wanted ndarray or SparseVector" % type(v))
"wanted ndarray or SparseVector" % type(v))


def _serialize_dense_vector(v):
"""Serialize a dense vector given as a NumPy array."""
if v.ndim != 1:
raise TypeError("_serialize_double_vector called on a %ddarray; "
"wanted a 1darray" % v.ndim)
"wanted a 1darray" % v.ndim)
if v.dtype != float64:
if numpy.issubdtype(v.dtype, numpy.complex):
raise TypeError("_serialize_double_vector called on an ndarray of %s; "
"wanted ndarray of float64" % v.dtype)
"wanted ndarray of float64" % v.dtype)
v = v.astype(float64)
length = v.shape[0]
ba = bytearray(5 + 8 * length)
Expand Down Expand Up @@ -158,10 +159,10 @@ def _deserialize_double_vector(ba):
"""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_vector called on a %s; "
"wanted bytearray" % type(ba))
"wanted bytearray" % type(ba))
if len(ba) < 5:
raise TypeError("_deserialize_double_vector called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
if ba[0] == DENSE_VECTOR_MAGIC:
return _deserialize_dense_vector(ba)
elif ba[0] == SPARSE_VECTOR_MAGIC:
Expand All @@ -175,7 +176,7 @@ def _deserialize_dense_vector(ba):
"""Deserialize a dense vector into a numpy array."""
if len(ba) < 5:
raise TypeError("_deserialize_dense_vector called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
length = ndarray(shape=[1], buffer=ba, offset=1, dtype=int32)[0]
if len(ba) != 8 * length + 5:
raise TypeError("_deserialize_dense_vector called on bytearray "
Expand All @@ -187,7 +188,7 @@ def _deserialize_sparse_vector(ba):
"""Deserialize a sparse vector into a MLlib SparseVector object."""
if len(ba) < 9:
raise TypeError("_deserialize_sparse_vector called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
header = ndarray(shape=[2], buffer=ba, offset=1, dtype=int32)
size = header[0]
nonzeros = header[1]
Expand All @@ -205,7 +206,7 @@ def _serialize_double_matrix(m):
if m.dtype != float64:
if numpy.issubdtype(m.dtype, numpy.complex):
raise TypeError("_serialize_double_matrix called on an ndarray of %s; "
"wanted ndarray of float64" % m.dtype)
"wanted ndarray of float64" % m.dtype)
m = m.astype(float64)
rows = m.shape[0]
cols = m.shape[1]
Expand All @@ -225,10 +226,10 @@ def _deserialize_double_matrix(ba):
"""Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
"wanted bytearray" % type(ba))
if len(ba) < 9:
raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
"which is too short" % len(ba))
"which is too short" % len(ba))
if ba[0] != DENSE_MATRIX_MAGIC:
raise TypeError("_deserialize_double_matrix called on bytearray "
"with wrong magic")
Expand Down Expand Up @@ -267,7 +268,7 @@ def _copyto(array, buffer, offset, shape, dtype):
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache() # TODO: users should unpersist() this later!
dataBytes.cache() # TODO: users should unpersist() this later!
return dataBytes


Expand All @@ -293,14 +294,14 @@ def _linear_predictor_typecheck(x, coeffs):
if type(x) == ndarray:
if x.ndim == 1:
if x.shape != coeffs.shape:
raise RuntimeError("Got array of %d elements; wanted %d"
% (numpy.shape(x)[0], coeffs.shape[0]))
raise RuntimeError("Got array of %d elements; wanted %d" % (
numpy.shape(x)[0], coeffs.shape[0]))
else:
raise RuntimeError("Bulk predict not yet supported.")
elif type(x) == SparseVector:
if x.size != coeffs.shape[0]:
raise RuntimeError("Got sparse vector of size %d; wanted %d"
% (x.size, coeffs.shape[0]))
raise RuntimeError("Got sparse vector of size %d; wanted %d" % (
x.size, coeffs.shape[0]))
elif (type(x) == RDD):
raise RuntimeError("Bulk predict not yet supported.")
else:
Expand All @@ -315,7 +316,7 @@ def _get_initial_weights(initial_weights, data):
if type(initial_weights) == ndarray:
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights.ndim + " dimensions, which is not 1")
initial_weights = numpy.zeros([initial_weights.shape[0]])
elif type(initial_weights) == SparseVector:
initial_weights = numpy.zeros([initial_weights.size])
Expand All @@ -333,10 +334,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]).__name__ + " which is not bytearray")
+ type(ans[0]).__name__ + " which is not bytearray")
elif type(ans[1]) != float:
raise RuntimeError("JVM call result had second element of type "
+ type(ans[0]).__name__ + " which is not float")
+ type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])


Expand Down Expand Up @@ -450,8 +451,7 @@ def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down
26 changes: 14 additions & 12 deletions python/pyspark/mllib/classification.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from pyspark.mllib.regression import LabeledPoint, LinearModel
from math import exp, log


class LogisticRegressionModel(LinearModel):
"""A linear binary classification model derived from logistic regression.
Expand Down Expand Up @@ -68,14 +69,14 @@ def predict(self, x):

class LogisticRegressionWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0,
miniBatchFraction=1.0, initialWeights=None):
def train(cls, data, iterations=100, step=1.0, miniBatchFraction=1.0, initialWeights=None):
"""Train a logistic regression model on the given data."""
sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd,
iterations, step, miniBatchFraction, i),
LogisticRegressionModel, data, initialWeights)
train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(
d._jrdd, iterations, step, miniBatchFraction, i)
return _regression_train_wrapper(sc, train_func, LogisticRegressionModel, data,
initialWeights)


class SVMModel(LinearModel):
"""A support vector machine.
Expand Down Expand Up @@ -106,16 +107,17 @@ def predict(self, x):
margin = _dot(x, self._coeff) + self._intercept
return 1 if margin >= 0 else 0


class SVMWithSGD(object):
@classmethod
def train(cls, data, iterations=100, step=1.0, regParam=1.0,
miniBatchFraction=1.0, initialWeights=None):
"""Train a support vector machine on the given data."""
sc = data.context
return _regression_train_wrapper(sc, lambda d, i:
sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd,
iterations, step, regParam, miniBatchFraction, i),
SVMModel, data, initialWeights)
train_func = lambda d, i: sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(
d._jrdd, iterations, step, regParam, miniBatchFraction, i)
return _regression_train_wrapper(sc, train_func, SVMModel, data, initialWeights)


class NaiveBayesModel(object):
"""
Expand Down Expand Up @@ -156,6 +158,7 @@ def predict(self, x):
"""Return the most likely class for a data vector x"""
return self.labels[numpy.argmax(self.pi + _dot(x, self.theta.transpose()))]


class NaiveBayes(object):
@classmethod
def train(cls, data, lambda_=1.0):
Expand Down Expand Up @@ -186,8 +189,7 @@ def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down
15 changes: 7 additions & 8 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ class KMeansModel(object):
"""A clustering model derived from the k-means method.
>>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> model = KMeans.train(sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> model = KMeans.train(
... sc.parallelize(data), 2, maxIterations=10, runs=30, initializationMode="random")
>>> model.predict(array([0.0, 0.0])) == model.predict(array([1.0, 1.0]))
True
>>> model.predict(array([8.0, 9.0])) == model.predict(array([9.0, 8.0]))
Expand Down Expand Up @@ -76,18 +77,17 @@ def predict(self, x):

class KMeans(object):
@classmethod
def train(cls, data, k, maxIterations=100, runs=1,
initializationMode="k-means||"):
def train(cls, data, k, maxIterations=100, runs=1, initializationMode="k-means||"):
"""Train a k-means clustering model."""
sc = data.context
dataBytes = _get_unmangled_double_vector_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
k, maxIterations, runs, initializationMode)
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(
dataBytes._jrdd, k, maxIterations, runs, initializationMode)
if len(ans) != 1:
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray")
+ type(ans[0]) + " which is not bytearray")
matrix = _deserialize_double_matrix(ans[0])
return KMeansModel([row for row in matrix])

Expand All @@ -96,8 +96,7 @@ def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
Expand Down
13 changes: 6 additions & 7 deletions python/pyspark/mllib/linalg.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def __init__(self, size, *args):
if len(args) == 1:
pairs = args[0]
if type(pairs) == dict:
pairs = pairs.items()
pairs = pairs.items()
pairs = sorted(pairs)
self.indices = array([p[0] for p in pairs], dtype=int32)
self.values = array([p[1] for p in pairs], dtype=float64)
Expand Down Expand Up @@ -88,7 +88,7 @@ def dot(self, other):
result += self.values[i] * other[self.indices[i]]
return result
elif other.ndim == 2:
results = [self.dot(other[:,i]) for i in xrange(other.shape[1])]
results = [self.dot(other[:, i]) for i in xrange(other.shape[1])]
return array(results)
else:
raise Exception("Cannot call dot with %d-dimensional array" % other.ndim)
Expand Down Expand Up @@ -135,7 +135,7 @@ def squared_distance(self, other):
return result
else:
raise Exception("Cannot call squared_distance with %d-dimensional array" %
other.ndim)
other.ndim)
else:
result = 0.0
i, j = 0, 0
Expand Down Expand Up @@ -184,15 +184,14 @@ def __eq__(self, other):
"""

return (isinstance(other, self.__class__)
and other.size == self.size
and array_equal(other.indices, self.indices)
and array_equal(other.values, self.values))
and other.size == self.size
and array_equal(other.indices, self.indices)
and array_equal(other.values, self.values))

def __ne__(self, other):
return not self.__eq__(other)



class Vectors(object):
"""
Factory methods for working with vectors. Note that dense vectors
Expand Down
15 changes: 9 additions & 6 deletions python/pyspark/mllib/recommendation.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_serialize_tuple, RatingDeserializer
from pyspark.rdd import RDD


class MatrixFactorizationModel(object):
"""A matrix factorisation model trained by regularized alternating
least-squares.
Expand Down Expand Up @@ -55,32 +56,34 @@ def predictAll(self, usersProducts):
return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
self._context, RatingDeserializer())


class ALS(object):
@classmethod
def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
sc = ratings.context
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks)
mod = sc._jvm.PythonMLLibAPI().trainALSModel(
ratingBytes._jrdd, rank, iterations, lambda_, blocks)
return MatrixFactorizationModel(sc, mod)

@classmethod
def trainImplicit(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
sc = ratings.context
ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
rank, iterations, lambda_, blocks, alpha)
mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(
ratingBytes._jrdd, rank, iterations, lambda_, blocks, alpha)
return MatrixFactorizationModel(sc, mod)


def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)


if __name__ == "__main__":
_test()
Loading

0 comments on commit d33d3c6

Please sign in to comment.