Skip to content

Commit

Permalink
Merge pull request Angel-ML#201 from cstur4/branch-1.1.0
Browse files Browse the repository at this point in the history
ServerMatrix load and write in parallel
  • Loading branch information
paynie authored Sep 14, 2017
2 parents d8b06c4 + 9fa5cfd commit 4f75d3f
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,12 @@ public class MatrixConf {
/** matrix data files save path */
public static final String MATRIX_SAVE_PATH = "matrix.save.path";
public static final String DEFAULT_MATRIX_SAVE_PATH = "";

/** the number of thread to load matrix partition */
public static final String MATRIX_LOAD_THREAD = "matrix.load.thread";
public static final String DEFAULT_MATRIX_LOAD_THREAD = "20";

/** the number of thread to write matrix partition */
public static final String MATRIX_WRITE_THREAD = "matrix.load.thread";
public static final String DEFAULT_MATRIX_WRITE_THREAD = "20";
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* The Server matrix on parameter server,assigned by {@link com.tencent.angel.master.AngelApplicationMaster},which represents a set of partitions of matrix
Expand Down Expand Up @@ -106,16 +111,65 @@ public void loadPartitions() throws IOException {
Path matrixPath = new Path(path, matrixName);
FileSystem fs = matrixPath.getFileSystem(conf);

for(Entry<Integer, ServerPartition> partEntry : partitionMaps.entrySet()) {
LOG.info("Load partition " + partEntry.getKey() + " from path " + matrixPath);
Path partitionFilePath = new Path(matrixPath, String.valueOf(partEntry.getKey()));
FSDataInputStream input = fs.open(partitionFilePath);

// Pass the matrix and partition number field
input.readInt();
input.readInt();
partEntry.getValue().load(input);
input.close();
final List<Map.Entry<Integer, ServerPartition>> entrys = new ArrayList<>(partitionMaps.entrySet());

String matrixLoadThread =
attribute.containsKey(MatrixConf.MATRIX_LOAD_THREAD)?
attribute.get(MatrixConf.MATRIX_LOAD_THREAD).trim():
MatrixConf.DEFAULT_MATRIX_LOAD_THREAD;
int numThread = Integer.parseInt(matrixLoadThread);
ThreadFactory matrixLoadThreadFacotry =
new ThreadFactoryBuilder().setNameFormat("MatrixLoadTask").build();
ExecutorService matrixLoadTaskPool = Executors.newFixedThreadPool(numThread, matrixLoadThreadFacotry);
final CountDownLatch taskCount = new CountDownLatch(entrys.size());

for (int i = 0; i < numThread; ++i) {
matrixLoadTaskPool.execute(new Runnable() {
int start;

public Runnable setStart(int start) {
this.start = start;
return this;
}

@Override
public void run() {

int workIter = start;
while (workIter < entrys.size()) {
try {
Entry<Integer, ServerPartition> partEntry = entrys.get(workIter);
LOG.info("Load partition " + partEntry.getKey()
+ " from path " + matrixPath);
Path partitionFilePath = new Path(matrixPath,
String.valueOf(partEntry.getKey()));
FSDataInputStream input = fs
.open(partitionFilePath);

// Pass the matrix and partition number field
input.readInt();
input.readInt();
partEntry.getValue().load(input);
input.close();

// load the part done.
taskCount.countDown();
workIter += numThread;
} catch (Exception e) {
LOG.fatal("Load partition from path " + matrixPath
+ " error:" + e.getMessage());
}
}
}

}.setStart(i));

}
try{
matrixLoadTaskPool.shutdown();
taskCount.await();
}catch (InterruptedException e) {
LOG.fatal("Load partition failed.");
}
}

Expand Down Expand Up @@ -188,11 +242,56 @@ public void writeTo(DataOutputStream output) throws IOException {
LOG.debug("writeTo output, matrixId: " + matrixId + ", martitionSize: "
+ partitionMaps.size());
}
for (Entry<Integer, ServerPartition> entry : partitionMaps.entrySet()) {
LOG.debug("write partitionId: " + entry.getKey());
output.writeInt(entry.getKey());
ServerPartition serverPartition = entry.getValue();
serverPartition.writeTo(output);
final List<Map.Entry<Integer, ServerPartition>> entrys = new ArrayList<>(partitionMaps.entrySet());

String matrixWriteThread =
attribute.containsKey(MatrixConf.MATRIX_WRITE_THREAD)?
attribute.get(MatrixConf.MATRIX_WRITE_THREAD).trim():
MatrixConf.DEFAULT_MATRIX_WRITE_THREAD;
int numThread = Integer.parseInt(matrixWriteThread);
ThreadFactory matrixWriteThreadFacotry =
new ThreadFactoryBuilder().setNameFormat("MatrixWriteTask").build();
ExecutorService matrixWriteTaskPool = Executors.newFixedThreadPool(numThread, matrixWriteThreadFacotry);
final CountDownLatch taskCount = new CountDownLatch(entrys.size());

for (int i = 0; i < numThread; ++i) {
matrixWriteTaskPool.execute(new Runnable() {
int start;

public Runnable setStart(int start) {
this.start = start;
return this;
}

@Override
public void run() {

int workIter = start;
while (workIter < entrys.size()) {
try {
Entry<Integer, ServerPartition> partEntry = entrys.get(workIter);
LOG.debug("write partitionId: " + partEntry.getKey());
output.writeInt(partEntry.getKey());
ServerPartition serverPartition = partEntry.getValue();
serverPartition.writeTo(output);

// write the part done.
taskCount.countDown();
workIter += numThread;
} catch (Exception e) {
LOG.fatal("write partition error:" + e.getMessage());
}
}
}

}.setStart(i));

}
try{
matrixWriteTaskPool.shutdown();
taskCount.await();
}catch (InterruptedException e) {
LOG.fatal("Write partition failed.");
}
}

Expand Down
2 changes: 1 addition & 1 deletion docs/apis/ModelParser.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
### ModelParser
Angel的PSModel在任务结束后会以二进制文件格式存储,每个partition存为一个文件,文件名为partitionID。一个PSModel的所有partition保存在同一个文件夹,文件夹的名字为PSModel的modelName字段。ModelParser类将一个PSModel的二进制文件解析成明文格式,可以通过下面命令提交Angel的ModelParser任务:

Angel的PSModel在任务结束后会以二进制文件格式存储,每个partition存为一个文件,文件名为partitionID。一个PSModel的所有partition保存在同一个文件夹,文件夹的名字为PSModel的modelName字段。ModelParser类将一个PSModel的二进制模型文件解析成明文格式,可以通过下面命令提交Angel的ModelParser任务:

```
./bin/angel-submit \
Expand Down

0 comments on commit 4f75d3f

Please sign in to comment.