Skip to content

Commit

Permalink
[PyAngel] fix bugs in runners while move from python2 to python3
Browse files Browse the repository at this point in the history
  • Loading branch information
biaoma-ty committed Dec 25, 2017
1 parent 7823a8d commit bfb3857
Show file tree
Hide file tree
Showing 11 changed files with 232 additions and 180 deletions.
8 changes: 5 additions & 3 deletions angel-ps/bin/pyangel
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ if [[ -z "${PYANGEL_PYTHON_SHELL}" ]]; then
PYANGEL_PYTHON_SHELL="${PYANGEL_PYTHON:-"python"}"
fi

WORKS_WITH_IPYTHON=$(python -c 'import sys; print(sys.version_info >= (2, 7, 0))')
WORKS_WITH_IPYTHON=$(python -c 'import sys; print(sys.version_info >= (3, 5, 0))')

if [[ -z "${PYANGEL_PYTHON}" ]]; then
if [[ ${PYANGEL_PYTHON_SHELL} == *ipython* && ! ${WORKS_WITH_IPYTHON} ]]; then
echo "IPython requires Python 2.7+; please install python2.7 or set PYANGEL_PYTHON" 1>&2
echo "IPython requires Python 3.5+; please install python3.5 or set PYANGEL_PYTHON" 1>&2
exit 1
else
PYANGEL_PYTHON=python
Expand All @@ -41,7 +41,9 @@ export PYANGEL_PYTHON

# Judge if it's local mode
if [[ $1 == "local" ]]; then
export PYANGEL_LOCAL_MODE=True
export PYANGEL_DIS_MODE=False
else
export PYANGEL_DIS_MODE=True
fi

# Add the PyAngel classes to the Python path:
Expand Down
50 changes: 22 additions & 28 deletions angel-ps/examples/src/main/python/fm_local_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from pyangel.ml.client.angel_client_factory import AngelClientFactory
from pyangel.ml.factorizationmachines.runner import FMRunner

class FMLocalExample(oject):
class FMLocalExample(object):

def __init__(self):
self.conf = Configuration()
Expand All @@ -48,53 +48,50 @@ def set_conf(self):
stev = 0.1

# Set local deploy mode
self.conf.set(AngelConf.ANGEL_DEPLOY_MODE, "LOCAL")
self.conf[AngelConf.ANGEL_DEPLOY_MODE] = 'LOCAL'

# Set basic self.configuration keys
self.conf.set_boolean("mapred.mapper.new-api", True)
self.conf.set(AngelConf.ANGEL_INPUTFORMAT_CLASS, 'org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat')
self.conf.set_boolean(AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST, True)
self.conf['mapred.mapper.new-api'] = True
self.conf[AngelConf.ANGEL_INPUTFORMAT_CLASS] = 'org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat'
self.conf[AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST] = True

#set angel resource parameters #worker, #task, #PS
self.conf.set_int(AngelConf.ANGEL_WORKERGROUP_NUMBER, 1)
self.conf.set_int(AngelConf.ANGEL_WORKER_TASK_NUMBER, 1)
self.conf.set_int(AngelConf.ANGEL_PS_NUMBER, 1)
self.conf[AngelConf.ANGEL_WORKERGROUP_NUMBER] = 1
self.conf[AngelConf.ANGEL_WORKER_TASK_NUMBER] = 1
self.conf[AngelConf.ANGEL_PS_NUMBER] = 1

#set FM algorithm parameters #feature #epoch
self.conf.set(MLConf.ML_FEATURE_NUM, str(feature_num))
self.conf.set(MLConf.ML_EPOCH_NUM, str(epoch_num))
self.conf.set(MLConf.ML_FM_RANK, str(rank))
self.conf.set(MLConf.ML_LEARN_RATE, str(lr))
self.conf.set(MLConf.ML_FM_REG0, str(reg0))
self.conf.set(MLConf.ML_FM_REG1, str(reg1))
self.conf.set(MLConf.ML_FM_REG2, str(reg2))
self.conf.set(MLConf.ML_FM_V_STDDEV, str(stev))
self.conf[MLConf.ML_FEATURE_NUM] = str(feature_num)
self.conf[MLConf.ML_EPOCH_NUM] = str(epoch_num)
self.conf[MLConf.ML_FM_RANK] = str(rank)
self.conf[MLConf.ML_LEARN_RATE] = str(lr)
self.conf[MLConf.ML_FM_REG0] = str(reg0)
self.conf[MLConf.ML_FM_REG1] = str(reg1)
self.conf[MLConf.ML_FM_REG2] = str(reg2)
self.conf[MLConf.ML_FM_V_STDDEV] = str(stev)

def train_on_local_cluster(self):
self.set_conf()
input_path = "./src/test/data/fm/food_fm_libsvm"
input_path = "data/fm/food_fm_libsvm"
LOCAL_FS = LocalFileSystem.DEFAULT_FS
TMP_PATH = tempfile.gettempdir()
save_path = LOCAL_FS + TMP_PATH + "/model"
log_path = LOCAL_FS + TMP_PATH + "/LRlog"

# Set trainning data path
self.conf.set(AngelConf.ANGEL_TRAIN_DATA_PATH, input_path)
self.conf[AngelConf.ANGEL_TRAIN_DATA_PATH] = input_path
# Set save model path
self.conf.set(AngelConf.ANGEL_SAVE_MODEL_PATH, save_path)
self.conf[AngelConf.ANGEL_SAVE_MODEL_PATH] = save_path
# Set log path
self.conf.set(AngelConf.ANGEL_LOG_PATH, log_path)
self.conf[AngelConf.ANGEL_LOG_PATH] = log_path
# Set actionType train
self.conf.set(AngelConf.ANGEL_ACTION_TYPE, MLConf.ANGEL_ML_TRAIN())
self.conf[AngelConf.ANGEL_ACTION_TYPE] = MLConf.ANGEL_ML_TRAIN

runner = FMRunner()
runner.train(self.conf)

angel_client = AngelClientFactory.get(self.conf)
angel_client.stop()

def fm_classification(self):
input_path = "./src/test/data/fm/a9a.train"
input_path = "data/fm/a9a.train"
LOCAL_FS = LocalFileSystem.DEFAULT_FS
TMP_PATH = tempfile.gettempdir()
save_path = LOCAL_FS + TMP_PATH + "/model"
Expand All @@ -116,8 +113,5 @@ def fm_classification(self):
runner = FMRunner()
runner.train(self.conf)

angel_client = AngelClientFactory.get(self.conf)
angel_client.stop()

example = FMLocalExample()
example.train_on_local_cluster()
3 changes: 3 additions & 0 deletions angel-ps/examples/src/main/python/gbdt_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# either express or implied. See the License for the specific language governing permissions and
#

import tempfile

from pyangel.conf import AngelConf
from pyangel.context import Configuration
from pyangel.ml.conf import MLConf
Expand Down Expand Up @@ -55,6 +57,7 @@ def set_conf(self):
self.conf[AngelConf.ANGEL_SAVE_MODEL_PATH] = output_path

# Set GBDT algorithm parameters
self.conf[MLConf.ML_DATA_FORMAT] = dataFmt
self.conf[MLConf.ML_FEATURE_NUM] = feature_num
self.conf[MLConf.ML_FEATURE_NNZ] = feature_nzz
self.conf[MLConf.ML_GBDT_TREE_NUM] = tree_num
Expand Down
67 changes: 56 additions & 11 deletions angel-ps/examples/src/main/python/gbdt_local_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@
# either express or implied. See the License for the specific language governing permissions and
#

import tempfile

from hadoop.local_fs import LocalFileSystem

from pyangel.conf import AngelConf
from pyangel.context import Configuration
Expand All @@ -26,14 +29,15 @@ def __init__(self):
self.MLConf = MLConf()

def set_conf(self):
# Input Path, please modify ${YOUR_ANGEL_HOME} as your local angel installation path,
# e.g. if your path is /home/angel/angel_1.3.0, your input_path should be:
# "file:///home/angel/angel_1.3.0/data/exampledata/GBDTLocalExampleData/agaricus.txt.train",
# and your out_path could be: "file:///home/angel/angel_1.3.0/data/output"
# if you need, you can delete the annotation mark before Line35,Line36,Line61,Line62, so
# there is no need for you to pass the configs every time you submit the pyangel job.
#input_path = "file:///${YOUR_ANGEL_HOME}/data/exampledata/GBDTLocalExampleData/agaricus.txt.train"
#output_path = "file:///${YOUR_ANGEL_HOME}/data/output"
"""
Input Path, please modify ${YOUR_ANGEL_HOME} as your local angel installation path,
e.g. if your path is /home/angel/angel_1.3.0, your input_path should be:
"file:///home/angel/angel_1.3.0/data/exampledata/GBDTLocalExampleData/agaricus.txt.train",
and your out_path could be: "file:///home/angel/angel_1.3.0/data/output"
if you need, you can delete the annotation mark before Line35,Line36,Line61,Line62, so
there is no need for you to pass the configs every time you submit the pyangel job.
:return:
"""
# Feature number of train data
feature_num = 127
# Number of nonzero features
Expand All @@ -56,12 +60,17 @@ def set_conf(self):
# Use local deploy mode and dummy data spliter
self.conf[AngelConf.ANGEL_DEPLOY_MODE] = "LOCAL"

# set input] = output path
self.conf['mapred.mapper.new-api'] = True
self.conf[AngelConf.ANGEL_INPUTFORMAT_CLASS] = 'org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat'
#self.conf[AngelConf.ANGEL_TRAIN_DATA_PATH] = input_path
#self.conf[AngelConf.ANGEL_SAVE_MODEL_PATH] = output_path
self.conf[AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST] = True

# Set angel resource parameters #worker, #task, #PS
self.conf[AngelConf.ANGEL_WORKERGROUP_NUMBER] = 1
self.conf[AngelConf.ANGEL_WORKER_TASK_NUMBER] = 1
self.conf[AngelConf.ANGEL_PS_NUMBER] = 1

# Set GBDT algorithm parameters
self.conf[MLConf.ML_DATA_FORMAT] = data_fmt
self.conf[MLConf.ML_FEATURE_NUM] = str(feature_num)
self.conf[MLConf.ML_FEATURE_NNZ] = str(feature_nzz)
self.conf[MLConf.ML_GBDT_TREE_NUM] = str(tree_num)
Expand All @@ -70,9 +79,45 @@ def set_conf(self):
self.conf[MLConf.ML_GBDT_SAMPLE_RATIO] = str(sample_ratio)
self.conf[MLConf.ML_LEARN_RATE] = str(learn_rate)

params = {
AngelConf.ANGEL_DEPLOY_MODE:'LOCAL',
'mapred.mapper.new-api':True,
AngelConf.ANGEL_INPUTFORMAT_CLASS:'org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat',
AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST:True,
AngelConf.ANGEL_WORKERGROUP_NUMBER:1,
AngelConf.ANGEL_WORKER_TASK_NUMBER:1,
AngelConf.ANGEL_PS_NUMBER:1,
MLConf.ML_DATA_FORMAT:'libsvm',
MLConf.ML_FEATURE_NUM:127,
MLConf.ML_FEATURE_NNZ:25,
MLConf.ML_GBDT_TREE_NUM:2,
MLConf.ML_GBDT_TREE_DEPTH:2,
MLConf.ML_GBDT_SPLIT_NUM:10,
MLConf.ML_GBDT_SAMPLE_RATIO:1.0,
MLConf.ML_LEARN_RATE:0.01
}

self.conf.load(params)

def train(self):
self.set_conf()

LOCAL_FS = LocalFileSystem.DEFAULT_FS
TMP_PATH = tempfile.gettempdir()
save_path = LOCAL_FS + TMP_PATH + "/model"
log_path = LOCAL_FS + TMP_PATH + "/GBDTlog"
input_path = "data/exampledata/GBDTLocalExampleData/agaricus.txt.train"
output_path = "data/output"

self.conf[AngelConf.ANGEL_TRAIN_DATA_PATH] = input_path
self.conf[AngelConf.ANGEL_SAVE_MODEL_PATH] = output_path

self.conf[AngelConf.ANGEL_SAVE_MODEL_PATH] = save_path
# Set log path
self.conf[AngelConf.ANGEL_LOG_PATH] = log_path
# Set actionType train
self.conf[AngelConf.ANGEL_ACTION_TYPE] = MLConf.ANGEL_ML_TRAIN

runner = GBDTRunner()
runner.train(self.conf)

Expand Down
49 changes: 23 additions & 26 deletions angel-ps/examples/src/main/python/k_means_local_exmple.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyangel.context import Configuration
from pyangel.ml.conf import MLConf
from pyangel.ml.client.angel_client_factory import AngelClientFactory
from pyangel.ml.clustering.k_means import KMeansRunner
from pyangel.ml.clustering.runner import KMeansRunner

class KmeansLocalExample(object):

Expand All @@ -44,60 +44,57 @@ def set_conf(self):
c = 0.15

# Set local deploy mode
self.conf.set(AngelConf.ANGEL_DEPLOY_MODE, "LOCAL")
self.conf[AngelConf.ANGEL_DEPLOY_MODE] = 'LOCAL'

# Set basic self.configuration key
self.conf.set_boolean("mapred.mapper.new-api", True)
self.conf.set(AngelConf.ANGEL_INPUTFORMAT_CLASS, 'org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat')
self.conf.set_boolean(AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST, True)
self.conf["mapred.mapper.new-api"] = True
self.conf[AngelConf.ANGEL_INPUTFORMAT_CLASS] = 'org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat'
self.conf[AngelConf.ANGEL_JOB_OUTPUT_PATH_DELETEONEXIST] = True

# Set angel resource parameters #worker, #task, #PS
self.conf.set_int(AngelConf.ANGEL_WORKERGROUP_NUMBER, 1)
self.conf.set_int(AngelConf.ANGEL_WORKER_TASK_NUMBER, 1)
self.conf.set_int(AngelConf.ANGEL_PS_NUMBER, 1)
self.conf[AngelConf.ANGEL_WORKERGROUP_NUMBER] = 1
self.conf[AngelConf.ANGEL_WORKER_TASK_NUMBER] = 1
self.conf[AngelConf.ANGEL_PS_NUMBER] = 1

# Set Kmeans algorithm parameters #cluster #feature #epoch
self.conf.set(MLConf.KMEANS_CENTER_NUM, str(center_num))
self.conf.set(MLConf.ML_FEATURE_NUM, str(feature_num))
self.conf.set(MLConf.ML_EPOCH_NUM, str(epoch_num))
self.conf.set(MLConf.KMEANS_SAMPLE_RATIO_PERBATCH, str(sample_ratio))
self.conf.set(MLConf.kMEANS_C, str(c))
self.conf[MLConf.KMEANS_CENTER_NUM] = str(center_num)
self.conf[MLConf.ML_FEATURE_NUM] = str(feature_num)
self.conf[MLConf.ML_EPOCH_NUM] = str(epoch_num)
self.conf[MLConf.KMEANS_SAMPLE_RATIO_PERBATCH] = str(sample_ratio)
self.conf[MLConf.kMEANS_C] = str(c)

# Set data format
self.conf.set(MLConf.ML_DATAFORMAT, data_fmt)
self.conf[MLConf.ML_DATA_FORMAT] = data_fmt

def train(self):
self.set_conf()
input_path = "data/exampledata/clusteringLocalExampleData/iris"
LOCAL_FS = LocalFileSystem.DEFAULT_FS
TMP_PATH = tempfile.gettempdir()
# Set trainning data path
self.conf.set(AngelConf.ANGEL_TRAIN_DATA_PATH, input_path)
self.conf[AngelConf.ANGEL_TRAIN_DATA_PATH] = input_path
# Set save model path
self.conf.set(AngelConf.ANGEL_SAVE_MODEL_PATH, LOCAL_FS + TMP_PATH + "/model")
self.conf[AngelConf.ANGEL_SAVE_MODEL_PATH] = LOCAL_FS + TMP_PATH + "/model"
# Set log sava path
self.conf.set(AngelConf.ANGEL_LOG_PATH, LOCAL_FS + TMP_PATH + "/kmeansLog/log")
self.conf[AngelConf.ANGEL_LOG_PATH] = LOCAL_FS + TMP_PATH + "/kmeansLog/log"
# Set actionType train
self.conf.set(AngelConf.ANGEL_ACTION_TYPE, MLConf.ANGEL_ML_TRAIN)
self.conf[AngelConf.ANGEL_ACTION_TYPE] = MLConf.ANGEL_ML_TRAIN

runner = KMeansRunner()
runner.train(self.conf)

angel_client = AngelClientFactory.get(self.conf)
angel_client.stop()

def predict_onLocal_cluster(self):
def predict_on_local_cluster(self):
self.set_conf()
LOCAL_FS = LocalFileSystem.DEFAULT_FS
TMP_PATH = tempfile.gettempdir()
# Set trainning data path
self.conf.set(AngelConf.ANGEL_PREDICT_DATA_PATH, input_path)
self.conf[AngelConf.ANGEL_PREDICT_DATA_PATH] = input_path
# Set load model path
self.conf.set(AngelConf.ANGEL_LOAD_MODEL_PATH, LOCAL_FS + TMP_PATH + "/model")
self.conf[AngelConf.ANGEL_LOAD_MODEL_PATH] = LOCAL_FS + TMP_PATH + "/model"
# Set predict result path
self.conf.set(AngelConf.ANGEL_PREDICT_PATH, LOCAL_FS + TMP_PATH + "/predict")
self.conf[AngelConf.ANGEL_PREDICT_PATH] = LOCAL_FS + TMP_PATH + "/predict"
# Set actionType prediction
self.conf.set(AngelConf.ANGEL_ACTION_TYPE, MLConf.ANGEL_ML_PREDICT)
self.conf[AngelConf.ANGEL_ACTION_TYPE] = MLConf.ANGEL_ML_PREDICT

runner = KMeansRunner()
runner.predict(self.conf)
Expand Down
Loading

0 comments on commit bfb3857

Please sign in to comment.