Skip to content

Commit

Permalink
[Release-1.4.0]
Browse files Browse the repository at this point in the history
  • Loading branch information
paynie committed Jan 11, 2018
1 parent 8448741 commit 230b9e5
Show file tree
Hide file tree
Showing 612 changed files with 56,807 additions and 10,175 deletions.
2 changes: 1 addition & 1 deletion angel-ps/bin/angel-submit
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ conf="$bin"/../conf
lib="$bin"/../lib

export ANGEL_HOME="$bin/../"
export ANGEL_VERSION=1.3.0
export ANGEL_VERSION=1.4.0

DEFAULT_LIBEXEC_DIR="$HADOOP_HOME"/libexec
HADOOP_LIBEXEC_DIR=${HADOOP_LIBEXEC_DIR:-$DEFAULT_LIBEXEC_DIR}
Expand Down
6 changes: 3 additions & 3 deletions angel-ps/bin/angel-submit-with-jarvis
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env bash

export HADOOP_HOME="/data/tesla/tdwgaia_angel"
export JAVA_HOME="/data/tesla/jdk1.8.0_101"
export ANGEL_VERSION=1.3.0
export HADOOP_HOME=""
export JAVA_HOME=""
export ANGEL_VERSION=1.4.0
export PATH=$JAVA_HOME/bin:$PATH

if [ "$HADOOP_HOME" = "" ]; then
Expand Down
8 changes: 3 additions & 5 deletions angel-ps/bin/pyangel
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ if [[ -z "${PYANGEL_PYTHON_SHELL}" ]]; then
PYANGEL_PYTHON_SHELL="${PYANGEL_PYTHON:-"python"}"
fi

WORKS_WITH_IPYTHON=$(python -c 'import sys; print(sys.version_info >= (3, 5, 0))')
WORKS_WITH_IPYTHON=$(python -c 'import sys; print(sys.version_info >= (2, 7, 0))')

if [[ -z "${PYANGEL_PYTHON}" ]]; then
if [[ ${PYANGEL_PYTHON_SHELL} == *ipython* && ! ${WORKS_WITH_IPYTHON} ]]; then
echo "IPython requires Python 3.5+; please install python3.5 or set PYANGEL_PYTHON" 1>&2
echo "IPython requires Python 2.7+; please install python2.7 or set PYANGEL_PYTHON" 1>&2
exit 1
else
PYANGEL_PYTHON=python
Expand All @@ -41,9 +41,7 @@ export PYANGEL_PYTHON

# Judge if it's local mode
if [[ $1 == "local" ]]; then
export PYANGEL_DIS_MODE=False
else
export PYANGEL_DIS_MODE=True
export PYANGEL_LOCAL_MODE=True
fi

# Add the PyAngel classes to the Python path:
Expand Down
2 changes: 1 addition & 1 deletion angel-ps/conf/angel-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<configuration>
<property>
<name>angel.job.libjars</name>
<value>${ANGEL_HOME}/lib/scala-reflect-2.11.7.jar,${ANGEL_HOME}/lib/memory-0.8.1.jar,${ANGEL_HOME}/lib/sketches-core-0.8.1.jar,${ANGEL_HOME}/lib/commons-pool-1.6.jar,${ANGEL_HOME}/lib/kryo-shaded-4.0.0.jar,${ANGEL_HOME}/lib/kryo-serializers-0.42.jar,${ANGEL_HOME}/lib/scala-library-2.11.8.jar,${ANGEL_HOME}/lib/angel-ps-core-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-tools-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-mllib-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-psf-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-examples-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/fastutil-7.1.0.jar,${ANGEL_HOME}/lib/sizeof-0.3.0.jar,${ANGEL_HOME}/lib/minlog-1.3.0.jar,${ANGEL_HOME}/lib/breeze_2.11-0.12.jar</value>
<value>${ANGEL_HOME}/lib/scala-reflect-2.11.7.jar,${ANGEL_HOME}/lib/memory-0.8.1.jar,${ANGEL_HOME}/lib/sketches-core-0.8.1.jar,${ANGEL_HOME}/lib/commons-pool-1.6.jar,${ANGEL_HOME}/lib/kryo-shaded-4.0.0.jar,${ANGEL_HOME}/lib/kryo-serializers-0.42.jar,${ANGEL_HOME}/lib/scala-library-2.11.8.jar,${ANGEL_HOME}/lib/angel-ps-core-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-tools-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-mllib-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-psf-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/angel-ps-examples-${ANGEL_VERSION}.jar,${ANGEL_HOME}/lib/fastutil-7.1.0.jar,${ANGEL_HOME}/lib/sizeof-0.3.0.jar,${ANGEL_HOME}/lib/minlog-1.3.0.jar,${ANGEL_HOME}/lib/breeze_2.11-0.13.jar</value>
</property>
<property>
<name>mapred.mapper.new-api</name>
Expand Down
2 changes: 1 addition & 1 deletion angel-ps/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<parent>
<groupId>com.tencent.angel</groupId>
<artifactId>angel-ps</artifactId>
<version>1.3.0</version>
<version>1.4.0</version>
</parent>
<artifactId>angel-ps-core</artifactId>
<name>angel-ps-core</name>
Expand Down
16 changes: 16 additions & 0 deletions angel-ps/core/src/main/java/com/tencent/angel/Chore.java
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Tencent is pleased to support the open source community by making Angel available.
*
* Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
*
* Licensed to the Apache Software Foundation (ASF) under one
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ public PartitionKey(int partitionId, int matrixId, int startRow, long startCol,
this.endCol = endCol;
}

public PartitionKey(int matrixId, int partId) {
this(partId, matrixId, -1, -1, -1, -1);
}

@Override
public String toString() {
StringBuilder builder = new StringBuilder();
Expand Down
106 changes: 70 additions & 36 deletions angel-ps/core/src/main/java/com/tencent/angel/client/AngelClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

import com.google.protobuf.ServiceException;
import com.tencent.angel.RunningMode;
import com.tencent.angel.common.Location;
import com.tencent.angel.common.location.Location;
import com.tencent.angel.conf.AngelConf;
import com.tencent.angel.conf.MatrixConf;
import com.tencent.angel.exception.AngelException;
Expand All @@ -28,7 +28,7 @@
import com.tencent.angel.ml.matrix.MatrixContext;
import com.tencent.angel.ml.model.MLModel;
import com.tencent.angel.ml.model.PSModel;
import com.tencent.angel.protobuf.RequestConverter;
import com.tencent.angel.protobuf.ProtobufUtil;
import com.tencent.angel.protobuf.generated.ClientMasterServiceProtos.*;
import com.tencent.angel.protobuf.generated.MLProtos.*;
import com.tencent.angel.utils.HdfsUtil;
Expand All @@ -47,9 +47,8 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* Angel application client. It provides the control interfaces for the application.
Expand All @@ -61,7 +60,7 @@ public abstract class AngelClient implements AngelClientInterface {
protected final Configuration conf;

/** matrices used in the application */
private final List<MatrixProto> matrixList;
private final Map<String, MatrixContext> nameToMatrixMap;

/** rpc client to master */
protected volatile MasterProtocol master;
Expand All @@ -83,6 +82,14 @@ public abstract class AngelClient implements AngelClientInterface {
protected Location masterLocation;

private static final DecimalFormat df = new DecimalFormat("#0.000000");

private final String clientId = UUID.randomUUID().toString();

private volatile Thread hbThread;

private final int hbIntervalMS;

private final AtomicBoolean stopped = new AtomicBoolean(false);

/**
*
Expand All @@ -92,9 +99,11 @@ public abstract class AngelClient implements AngelClientInterface {
*/
public AngelClient(Configuration conf){
this.conf = conf;
matrixList = new ArrayList<MatrixProto>();
nameToMatrixMap = new LinkedHashMap<>();
isExecuteFinished = false;
isFinished = false;
hbIntervalMS = conf.getInt(AngelConf.ANGEL_CLIENT_HEARTBEAT_INTERVAL_MS,
AngelConf.DEFAULT_ANGEL_CLIENT_HEARTBEAT_INTERVAL_MS);
}

@SuppressWarnings("rawtypes")
Expand Down Expand Up @@ -139,6 +148,31 @@ public void runTask(String taskClassName) throws AngelException {
}
}

protected void startHeartbeat() throws ServiceException {
if(master == null) {
LOG.error("Master has not been connected");
return;
}

master.clientRegister(null, ClientRegisterRequest.newBuilder().setClientId(clientId).build());

hbThread = new Thread(() -> {
while(!stopped.get() && !Thread.interrupted()) {
try {
Thread.sleep(hbIntervalMS);
master.keepAlive(null, KeepAliveRequest.newBuilder().setClientId(clientId).build());
} catch (Throwable e) {
if(!stopped.get()) {
LOG.error("AngelClient " + clientId + " send heartbeat to Master failed");
}
}
}
});

hbThread.setName("client-heartbeat");
hbThread.start();
}

@Override
public void run() throws AngelException {
if(master == null) {
Expand All @@ -156,9 +190,14 @@ public void run() throws AngelException {

@Override
public void addMatrix(MatrixContext mContext) throws AngelException {
try{
matrixList.add(mContext.buildMatProto(conf));
} catch (Exception x) {
if (nameToMatrixMap.containsKey(mContext.getName())) {
throw new AngelException("Matrix \"" + mContext.getName() + "\" already exist, please check it");
}

try {
mContext.init(conf);
nameToMatrixMap.put(mContext.getName(), mContext);
} catch (Throwable x) {
throw new AngelException(x);
}
}
Expand Down Expand Up @@ -254,6 +293,18 @@ public void stop(int stateCode) throws AngelException {
stop();
}

@Override
public void stop() throws AngelException {
nameToMatrixMap.clear();
isExecuteFinished = false;
isFinished = false;
if(!stopped.getAndSet(true)) {
if(hbThread != null) {
hbThread.interrupt();
}
}
}

@Override
public void waitForCompletion() throws AngelException{
if(master == null) {
Expand Down Expand Up @@ -515,35 +566,18 @@ private GetJobReportResponse tryGetResponseFromFile(boolean deleteOnExist) throw
}

protected void createMatrices() throws InvalidParameterException, ServiceException {
CreateMatricesRequest createMatricsRequest =
RequestConverter.buildCreateMatricesRequest(matrixList);
master.createMatrices(null, createMatricsRequest);
waitForMatricesCreated(matrixList);
master.createMatrices(null, ProtobufUtil.buildCreateMatricesRequest(new ArrayList<MatrixContext>(nameToMatrixMap.values())));
List<String> matrixNames = new ArrayList<>(nameToMatrixMap.keySet());
waitForMatricesCreated(matrixNames);
}

private void waitForMatricesCreated(List<MatrixProto> matrixList) throws ServiceException {
CheckMatricesCreatedRequest.Builder builder = CheckMatricesCreatedRequest.newBuilder();
int size = matrixList.size();
for(int i = 0; i < size; i++) {
builder.addMatrixNames(matrixList.get(i).getName());
}
CheckMatricesCreatedRequest request = builder.build();

boolean isAllCreated = true;
private void waitForMatricesCreated(List<String> matrixNames) throws ServiceException {
CheckMatricesCreatedRequest request = CheckMatricesCreatedRequest.newBuilder().addAllMatrixNames(matrixNames).build();

int size = matrixNames.size();
while(true) {
CheckMatricesCreatedResponse response = master.checkMatricesCreated(null, request);
List<MatrixStatus> status = response.getStatusList();
assert(size == status.size());

isAllCreated = true;
for(int i = 0; i < size; i++) {
if(status.get(i) != MatrixStatus.M_OK) {
isAllCreated = false;
break;
}
}

if(isAllCreated) {
if(response.getStatus() == 0) {
return;
}

Expand Down Expand Up @@ -763,7 +797,7 @@ protected void waitForAllPS(int psNumber) throws ServiceException, InterruptedEx
boolean isAllPSReady = true;
while(true) {
GetAllPSLocationResponse response = master.getAllPSLocation(null, GetAllPSLocationRequest.newBuilder().build());
List<PSLocation> psLocs = response.getPsLocationsList();
List<PSLocationProto> psLocs = response.getPsLocationsList();
int size = psLocs.size();
if(size == psNumber) {
isAllPSReady = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.tencent.angel.client;

import com.tencent.angel.common.Location;
import com.tencent.angel.common.location.Location;
import org.apache.hadoop.conf.Configuration;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ protected void updateMaster(int maxWaitSeconds) throws Exception {
try {
LOG.info("start to create rpc client to am");
master = connection.getMasterService(masterLocation.getIp(), masterLocation.getPort());
master.ping(null, PingRequest.newBuilder().build());
startHeartbeat();
break;
} catch (ServiceException e) {
Thread.sleep(1000);
Expand All @@ -97,6 +97,7 @@ protected void updateMaster(int maxWaitSeconds) throws Exception {

@Override
public void stop() throws AngelException{
super.stop();
if(cluster != null) {
cluster.stop();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Tencent is pleased to support the open source community by making Angel available.
*
* Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down Expand Up @@ -25,7 +41,7 @@

import com.google.protobuf.ServiceException;
import com.tencent.angel.client.AngelClient;
import com.tencent.angel.common.Location;
import com.tencent.angel.common.location.Location;
import com.tencent.angel.conf.AngelConf;
import com.tencent.angel.exception.AngelException;
import com.tencent.angel.ipc.TConnection;
Expand Down Expand Up @@ -177,6 +193,7 @@ public void startPSServer() throws AngelException {

@Override
public void stop() throws AngelException{
super.stop();
if (yarnClient != null) {
try {
yarnClient.killApplication(appId);
Expand All @@ -191,6 +208,7 @@ public void stop() throws AngelException{
@Override
public void stop(int stateCode) throws AngelException{
LOG.info("stop the application");
super.stop();
if(master != null) {
try {
LOG.info("master is not null, send stop command to Master, stateCode=" + stateCode);
Expand Down Expand Up @@ -394,6 +412,7 @@ private ApplicationSubmissionContext createApplicationSubmissionContext(Configur
DataOutputBuffer dob = new DataOutputBuffer();
ts.writeTokenStorageToStream(dob);
ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
dob.close();

// Setup the command to run the AM
List<String> vargs = new ArrayList<String>(8);
Expand Down Expand Up @@ -537,8 +556,9 @@ protected void updateMaster(int maxWaitSeconds) throws Exception {
masterLocation = new Location(host, port);
LOG.info("start to create rpc client to am");
master = connection.getMasterService(masterLocation.getIp(), masterLocation.getPort());
master.ping(null, PingRequest.newBuilder().build());
startHeartbeat();
} catch (ServiceException e) {
LOG.error("Register to Master failed, ", e);
Thread.sleep(1000);
tryTime++;
continue;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
/*
* Tencent is pleased to support the open source community by making Angel available.
*
* Copyright (C) 2017 THL A29 Limited, a Tencent company. All rights reserved.
*
* Licensed under the BSD 3-Clause License (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
* https://opensource.org/licenses/BSD-3-Clause
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
* either express or implied. See the License for the specific language governing permissions and
* limitations under the License.
*/

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
Expand Down
Loading

0 comments on commit 230b9e5

Please sign in to comment.