Skip to content

Commit

Permalink
Merge pull request Angel-ML#1 from Tencent/master
Browse files Browse the repository at this point in the history
ri
  • Loading branch information
Chris19920210 authored Sep 18, 2017
2 parents ecc06bf + 3d23145 commit e4f855c
Show file tree
Hide file tree
Showing 427 changed files with 47,367 additions and 7,905 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@


[![license](http://img.shields.io/badge/license-BSD3-blue.svg?style=flat)](https://github.com/tencent/angel/blob/master/LICENSE)
[![Release Version](https://img.shields.io/badge/release-1.1.0-red.svg)](https://github.com/tencent/angel/releases)
[![Release Version](https://img.shields.io/badge/release-1.2.0-red.svg)](https://github.com/tencent/angel/releases)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/tencent/angel/pulls)

[(English Documents Available)](./README_en.md)

**Angel**是一个基于参数服务器(Parameter Server)理念开发的高性能分布式机器学习平台,它基于腾讯内部的海量数据进行了反复的调优,并具有广泛的适用性和稳定性,模型维度越高,优势越明显。 **Angel**由腾讯和北京大学联合开发,兼顾了工业界的高可用性和学术界的创新性。

Expand Down
4 changes: 2 additions & 2 deletions README_en.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


[![license](http://img.shields.io/badge/license-BSD3-brightgreen.svg?style=flat)](https://github.com/tencent/angel/blob/master/LICENSE)
[![Release Version](https://img.shields.io/badge/release-1.0.0-red.svg)](https://github.com/tencent/angel/releases)
[![Release Version](https://img.shields.io/badge/release-1.2.0-red.svg)](https://github.com/tencent/angel/releases)
[![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)](https://github.com/tencent/angel/pulls)

**Angel** is a high-performance distributed machine learning platform based on the philosophy of Parameter Server. It is tuned for performance with big data from Tencent and has a wide range of applicability and stability, demonstrating increasing advantage in handling higher dimension model. Angel is jointly developed by Tencent and Peking University, taking account of both high availability in industry and innovation in academia.
Expand All @@ -16,7 +16,7 @@ We welcome everyone interested in machine learning to contribute code, create is
## Introduction to Angel

* [Architecture](./docs/overview/architecture_en.md)
* [Code Framework](./docs/overview/code_framework.md)
* [Code Framework](./docs/overview/code_framework_en.md)
* [Design](./docs/overview/design_philosophy_en.md)
* [Spark on Angel](./docs/overview/spark_on_angel_en.md)

Expand Down
35 changes: 35 additions & 0 deletions angel-ps/bin/angel-javis-submit
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/usr/bin/env bash

if [ "$HADOOP_HOME" = "" ]; then
echo "HADOOP_HOME is not set"
exit 1
fi

bin=`which $0`
bin=`dirname ${bin}`
bin=`cd "$bin"; pwd`

conf="$bin"/../conf
lib="$bin"/../lib

echo $lib
export ANGEL_HOME="$bin/../"

DEFAULT_LIBEXEC_DIR="$HADOOP_HOME"/libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh

for f in "$lib"/*.jar; do
echo $f
if [ "$CLASSPATH" ]; then
export CLASSPATH=$CLASSPATH:$f
else
export CLASSPATH=$f
fi
done

export CLASS="com.tencent.angel.utils.AngelRunJar"

export CLASSPATH=$CLASSPATH
echo "$ANGEL_HOME"
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"
45 changes: 45 additions & 0 deletions angel-ps/bin/angel-submit-with-jarvis
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#!/usr/bin/env bash

export HADOOP_HOME="/data/tesla/tdwgaia_angel"
export JAVA_HOME="/data/tesla/jdk1.8.0_101"
export PATH=$JAVA_HOME/bin:$PATH

if [ "$HADOOP_HOME" = "" ]; then
echo "HADOOP_HOME is not set"
exit 1
fi

bin=`which $0`
bin=`dirname ${bin}`
bin=`cd "$bin"; pwd`

conf="$bin"/../conf
lib="$bin"/../lib

echo $lib
export ANGEL_HOME="$bin/../"

DEFAULT_LIBEXEC_DIR="$HADOOP_HOME"/libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
. $HADOOP_LIBEXEC_DIR/hadoop-config.sh

for f in "$lib"/*.jar; do
echo $f
if [ "$CLASSPATH" ]; then
export CLASSPATH=$CLASSPATH:$f
else
export CLASSPATH=$f
fi
done


export CLASS="com.tencent.jarvis.angel.middleware.submit4Angel"
export JAVA=/data/tesla/jdk1.8.0_101/bin/java

echo "$ANGEL_HOME"
echo $CLASSPATH
echo "class is:" $CLASS

echo $JAVA $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@" "-Dfs.defaultFS=hdfs://tl-nn-tdw.tencent-distribute.com:54310"
exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@"

28 changes: 14 additions & 14 deletions angel-ps/core/src/main/java/com/tencent/angel/PartitionKey.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public class PartitionKey implements Comparable<PartitionKey>, Serialize {
* [StartCol, endCol);
*/
int startRow = 0;
int startCol = 0;
long startCol = 0;
int endRow = 0;
int endCol = 0;
long endCol = 0;

public PartitionKey() {}

public PartitionKey(int partitionId, int matrixId, int startRow, int startCol, int endRow,
int endCol) {
public PartitionKey(int partitionId, int matrixId, int startRow, long startCol, int endRow,
long endCol) {
super();
this.partitionId = partitionId;
this.matrixId = matrixId;
Expand Down Expand Up @@ -84,7 +84,7 @@ public int getStartRow() {
return startRow;
}

public int getStartCol() {
public long getStartCol() {
return startCol;
}

Expand All @@ -96,7 +96,7 @@ public int getEndRow() {
return endRow;
}

public int getEndCol() {
public long getEndCol() {
return endCol;
}

Expand All @@ -106,16 +106,16 @@ public void setEndCol(int endCol) {

public void write(DataOutputStream out) throws IOException {
out.writeInt(startRow);
out.writeInt(startCol);
out.writeLong(startCol);
out.writeInt(endRow);
out.writeInt(endCol);
out.writeLong(endCol);
}

public void read(DataInputStream input) throws IOException {
startRow = input.readInt();
startCol = input.readInt();
startCol = input.readLong();
endRow = input.readInt();
endCol = input.readInt();
endCol = input.readLong();
}

public void setPartitionId(int partitionId) {
Expand Down Expand Up @@ -157,8 +157,8 @@ public void serialize(ByteBuf buf) {
buf.writeInt(partitionId);
buf.writeInt(startRow);
buf.writeInt(endRow);
buf.writeInt(startCol);
buf.writeInt(endCol);
buf.writeLong(startCol);
buf.writeLong(endCol);
}

@Override
Expand All @@ -167,8 +167,8 @@ public void deserialize(ByteBuf buf) {
partitionId = buf.readInt();
startRow = buf.readInt();
endRow = buf.readInt();
startCol = buf.readInt();
endCol = buf.readInt();
startCol = buf.readLong();
endCol = buf.readLong();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ protected void updateMaster(int maxWaitSeconds) throws Exception {
Thread.sleep(1000);
tryTime++;
} else {
LOG.info("appMaster getTrackingUrl = " + appMaster.getTrackingUrl());
LOG.info("appMaster getTrackingUrl = " + appMaster.getTrackingUrl().replace("proxy", "cluster/app"));
LOG.info("master host=" + host + ", port=" + port);
try {
masterLocation = new Location(host, port);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import io.netty.buffer.ByteBuf;

import java.util.Queue;

/**
* Serialize interface. It used by Netty to transfer data.
*/
Expand Down
61 changes: 40 additions & 21 deletions angel-ps/core/src/main/java/com/tencent/angel/conf/AngelConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -397,23 +397,24 @@ public AngelConf(Configuration conf) {
*/
public static final String ANGEL_WORKER_HEARTBEAT_TIMEOUT_MS = ANGEL_WORKER_PREFIX
+ "heartbeat.timeout.ms";
public static final long DEFAULT_ANGEL_WORKER_HEARTBEAT_TIMEOUT_MS = 60000;
public static final long DEFAULT_ANGEL_WORKER_HEARTBEAT_TIMEOUT_MS = 600000;

public static final String ANGEL_WORKERGROUP_FAILED_TOLERATE = ANGEL_WORKERGROUP_PREFIX
+ "failed.tolerate";
public static final double DEFAULT_WORKERGROUP_FAILED_TOLERATE = 0.1;

public static final String ANGEL_TASK_ERROR_TOLERATE = ANGEL_PREFIX + "task.error.tolerate";
public static final double DEFAULT_ANGEL_TASK_ERROR_TOLERATE = 0.01;

public static final String ANGEL_WORKER_TVECTOR_POOL_ENABLE = ANGEL_WORKER_PREFIX
+ "tvector.pool.enable";
public static final boolean DEFAULT_ANGEL_WORKER_TVECTOR_POOL_ENABLE = false;

/** The maximum number of times AppMaster can try. */
public static final String ANGEL_WORKER_MAX_ATTEMPTS = ANGEL_WORKER_PREFIX + "max-attempts";
public static final int DEFAULT_WORKER_MAX_ATTEMPTS = 4;

/** The workers number for matrix operations */
public static final String ANGEL_WORKER_MATRIX_EXECUTORS_NUM = ANGEL_WORKER_PREFIX + "matrix.executors.num";
public static final int DEFAULT_ANGEL_WORKER_MATRIX_EXECUTORS_NUM = 16;


// //////////////////////////////
// Task Configs
// //////////////////////////////
Expand Down Expand Up @@ -536,7 +537,7 @@ public AngelConf(Configuration conf) {
*/
public static final String ANGEL_PS_HEARTBEAT_TIMEOUT_MS = ANGEL_PS_PREFIX
+ "heartbeat.timeout.ms";
public static final long DEFAULT_ANGEL_PS_HEARTBEAT_TIMEOUT_MS = 60000;
public static final long DEFAULT_ANGEL_PS_HEARTBEAT_TIMEOUT_MS = 600000;

/** The number of worker threads for commiting matrices in ps. */
public static final String ANGEL_PS_COMMIT_TASK_NUM = ANGEL_PS_PREFIX + "commit.task.number";
Expand All @@ -547,11 +548,11 @@ public AngelConf(Configuration conf) {
public static final Class<?> DEFAULT_ANGEL_PS_ROW_UPDATER = DefaultRowUpdater.class;

/** PS executors thread pool size */
public static final String ANGEL_PS_WORKERPOOL_SIZE = ANGEL_PS_PREFIX +
"workerpool.size";
public static final String ANGEL_PS_MATRIX_DISKIO_WORKER_POOL_SIZE = ANGEL_PS_PREFIX +
"matrix.diskio.worker.pool.size";

/** Default PS executors thread pool size */
public static final int DEFAULT_ANGEL_PS_WORKERPOOL_SIZE = Runtime.getRuntime().availableProcessors();
public static final int DEFAULT_ANGEL_PS_MATRIX_DISKIO_WORKER_POOL_SIZE = Math.max(16, (int)(Runtime.getRuntime().availableProcessors() * 0.25));


// ////////////////// IPC //////////////////////////
Expand Down Expand Up @@ -616,15 +617,41 @@ public AngelConf(Configuration conf) {
*/
public static final String ANGEL_MATRIXTRANSFER_MAX_REQUESTNUM_PERSERVER = ANGEL_PREFIX
+ "matrixtransfer.max.requestnum.perserver";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_MAX_REQUESTNUM_PERSERVER = 4;
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_MAX_REQUESTNUM_PERSERVER = 16;

public static final String ANGEL_MATRIXTRANSFER_CLIENT_REQUESTER_POOL_SIZE = ANGEL_PREFIX
+ "matrixtransfer.client.requester.pool.size";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_CLIENT_REQUESTER_POOL_SIZE = Math.max(16, (int)(Runtime.getRuntime().availableProcessors() * 0.5));

public static final String ANGEL_MATRIXTRANSFER_CLIENT_RESPONSER_POOL_SIZE = ANGEL_PREFIX
+ "matrixtransfer.client.responser.pool.size";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_CLIENT_RESPONSER_POOL_SIZE = Math.max(16, (int)(Runtime.getRuntime().availableProcessors() * 0.5));


public static final String ANGEL_MATRIXTRANSFER_SERVER_WORKER_POOL_SIZE = ANGEL_PREFIX
+ "matrixtransfer.server.worker.pool.size";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_SERVER_WORKER_POOL_SIZE = Runtime.getRuntime().availableProcessors();

public static final String ANGEL_MATRIXTRANSFER_SERVER_SENDER_POOL_SIZE = ANGEL_PREFIX
+ "matrixtransfer.server.sender.pool.size";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_SERVER_SENDER_POOL_SIZE = Math.max(8, (int)(Runtime.getRuntime().availableProcessors() * 0.25));

public static final String ANGEL_MATRIXTRANSFER_SERVER_USER_SENDER = ANGEL_PREFIX
+ "matrixtransfer.server.user.sender";
public static final boolean DEFAULT_ANGEL_MATRIXTRANSFER_SERVER_USER_SENDER = false;

public static final String ANGEL_MATRIX_OPLOG_MERGER_POOL_SIZE = ANGEL_PREFIX
+ "matrix.oplog.merger.pool.size";

public static final int DEFAULT_ANGEL_MATRIX_OPLOG_MERGER_POOL_SIZE = Math.max(8, (int)(Runtime.getRuntime().availableProcessors() * 0.25));

/**
* The maximum allowed number of matrix transfer requests which are sending to the servers(ps). It
* used to flow-control between psagent and ps.
*/
public static final String ANGEL_MATRIXTRANSFER_MAX_REQUESTNUM = ANGEL_PREFIX
+ "matrixtransfer.max.requestnum";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_MAX = 64;
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_MAX = 1024;

/**
* The maximum allowed size of requests/responses which are in flight. It used to flow-control
Expand All @@ -644,7 +671,7 @@ public AngelConf(Configuration conf) {
/** The time interval in milliseconds for failed matrix transfer requests. */
public static final String ANGEL_MATRIXTRANSFER_RETRY_INTERVAL_MS = ANGEL_PREFIX
+ "matrixtransfer.retry.interval.ms";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_RETRY_INTERVAL_MS = 3000;
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_RETRY_INTERVAL_MS = 2000;

/** Weather we need use direct buffer in netty client. */
public static final String ANGEL_NETTY_MATRIXTRANSFER_CLIENT_USEDIRECTBUFFER =
Expand All @@ -671,7 +698,7 @@ public AngelConf(Configuration conf) {
*/
public static final String ANGEL_MATRIXTRANSFER_CHECK_INTERVAL_MS = ANGEL_PREFIX
+ "matrixtransfer.check.interval.ms";
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_CHECK_INTERVAL_MS = 1000;
public static final int DEFAULT_ANGEL_MATRIXTRANSFER_CHECK_INTERVAL_MS = 100;

// //////////////////////////////
// Matrix transfer Configs.
Expand Down Expand Up @@ -703,14 +730,6 @@ public AngelConf(Configuration conf) {
+ "sync.clock.enable";
public static final boolean DEFAULT_ANGEL_PSAGENT_SYNC_CLOCK_ENABLE = true;

/** PSAgent executors thread pool size */
public static final String ANGEL_PSAGENT_WORKERPOOL_SIZE = ANGEL_PSAGENT_PREFIX +
"workerpool.size";

/** Default PSAgent executors thread pool size */
public static final int DEFAULT_ANGEL_PSAGENT_WORKERPOOL_SIZE = Runtime.getRuntime().availableProcessors();


// Configs used to ANGEL_PS_PSAGENT running mode future.
public static final String ANGEL_PSAGENT_NUMBER = ANGEL_PSAGENT_PREFIX + "number";
public static final int DEFAULT_ANGEL_PSAGENT_NUMBER = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ public class MatrixConf {
/** matrix data files save path */
public static final String MATRIX_SAVE_PATH = "matrix.save.path";
public static final String DEFAULT_MATRIX_SAVE_PATH = "";

/** the number of thread to load matrix partition */
public static final String MATRIX_LOAD_THREAD = "matrix.load.thread";
public static final String DEFAULT_MATRIX_LOAD_THREAD = "20";

/** the number of thread to write matrix partition */
public static final String MATRIX_WRITE_THREAD = "matrix.load.thread";
public static final String DEFAULT_MATRIX_WRITE_THREAD = "20";
}
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
* Tencent is pleased to support the open source community by making Angel available.
*
* Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*
*/



package com.tencent.angel.ipc;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,6 @@ public void serialize(FSDataOutputStream output) throws IOException {
}

for(it.unimi.dsi.fastutil.ints.Int2ObjectMap.Entry<MatrixProto> entry:matrixProtoMap.int2ObjectEntrySet()) {
LOG.info("write meta for matrix " + entry.getValue());
entry.getValue().writeDelimitedTo(output);
}
} finally {
Expand Down
Loading

0 comments on commit e4f855c

Please sign in to comment.