From feead8110d5d17f10f08e0101bbc4d0948529a88 Mon Sep 17 00:00:00 2001 From: yunkuntan Date: Thu, 31 Aug 2017 15:27:03 +0800 Subject: [PATCH 1/2] Update ModelParser.md [DOC] improve ModelParser doc. --- docs/apis/ModelParser.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/apis/ModelParser.md b/docs/apis/ModelParser.md index f9001345b..a23fb5899 100644 --- a/docs/apis/ModelParser.md +++ b/docs/apis/ModelParser.md @@ -1,5 +1,5 @@ ### ModelParser -Angel的PSModel在任务结束后,默认会保存为二进制文件,ModelParser类将二进制模型文件解析成明文,可以通过下面命令提交Angel的ModelParser任务: +Angel的PSModel在任务结束后会以二进制文件格式存储,每个partition存为一个文件,文件名为partitionID。一个PSModel的所有partition保存在同一个文件夹,文件夹的名字为PSModel的modelName字段。ModelParser类将一个PSModel的二进制模型文件解析成明文格式,可以通过下面命令提交Angel的ModelParser任务: ``` @@ -46,4 +46,4 @@ Angel的PSModel在任务结束后,默认会保存为二进制文件,ModelPar 6:0.001803243020660425 8:1.9413353447408782E-4 ``` - \ No newline at end of file + From 0b1768ddc3d17353e467fac5968df70e1b3fb878 Mon Sep 17 00:00:00 2001 From: cstur4 Date: Wed, 13 Sep 2017 15:03:30 +0800 Subject: [PATCH 2/2] matrix load and write partition in parallel --- .../com/tencent/angel/conf/MatrixConf.java | 8 ++ .../angel/ps/impl/matrix/ServerMatrix.java | 129 ++++++++++++++++-- 2 files changed, 122 insertions(+), 15 deletions(-) diff --git a/angel-ps/core/src/main/java/com/tencent/angel/conf/MatrixConf.java b/angel-ps/core/src/main/java/com/tencent/angel/conf/MatrixConf.java index d821b1d4d..a9c0d362c 100644 --- a/angel-ps/core/src/main/java/com/tencent/angel/conf/MatrixConf.java +++ b/angel-ps/core/src/main/java/com/tencent/angel/conf/MatrixConf.java @@ -67,4 +67,12 @@ public class MatrixConf { /** matrix data files save path */ public static final String MATRIX_SAVE_PATH = "matrix.save.path"; public static final String DEFAULT_MATRIX_SAVE_PATH = ""; + + /** the number of thread to load matrix partition */ + public static final String MATRIX_LOAD_THREAD = "matrix.load.thread"; + public static final String DEFAULT_MATRIX_LOAD_THREAD = "20"; + + /** the number of thread to write matrix partition */ + public static final String MATRIX_WRITE_THREAD = "matrix.load.thread"; + public static final String DEFAULT_MATRIX_WRITE_THREAD = "20"; } diff --git a/angel-ps/core/src/main/java/com/tencent/angel/ps/impl/matrix/ServerMatrix.java b/angel-ps/core/src/main/java/com/tencent/angel/ps/impl/matrix/ServerMatrix.java index 9879393bd..e82b70b0f 100644 --- a/angel-ps/core/src/main/java/com/tencent/angel/ps/impl/matrix/ServerMatrix.java +++ b/angel-ps/core/src/main/java/com/tencent/angel/ps/impl/matrix/ServerMatrix.java @@ -42,6 +42,11 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * The Server matrix on parameter server,assigned by {@link com.tencent.angel.master.AngelApplicationMaster},which represents a set of partitions of matrix @@ -106,16 +111,65 @@ public void loadPartitions() throws IOException { Path matrixPath = new Path(path, matrixName); FileSystem fs = matrixPath.getFileSystem(conf); - for(Entry partEntry : partitionMaps.entrySet()) { - LOG.info("Load partition " + partEntry.getKey() + " from path " + matrixPath); - Path partitionFilePath = new Path(matrixPath, String.valueOf(partEntry.getKey())); - FSDataInputStream input = fs.open(partitionFilePath); - - // Pass the matrix and partition number field - input.readInt(); - input.readInt(); - partEntry.getValue().load(input); - input.close(); + final List> entrys = new ArrayList<>(partitionMaps.entrySet()); + + String matrixLoadThread = + attribute.containsKey(MatrixConf.MATRIX_LOAD_THREAD)? + attribute.get(MatrixConf.MATRIX_LOAD_THREAD).trim(): + MatrixConf.DEFAULT_MATRIX_LOAD_THREAD; + int numThread = Integer.parseInt(matrixLoadThread); + ThreadFactory matrixLoadThreadFacotry = + new ThreadFactoryBuilder().setNameFormat("MatrixLoadTask").build(); + ExecutorService matrixLoadTaskPool = Executors.newFixedThreadPool(numThread, matrixLoadThreadFacotry); + final CountDownLatch taskCount = new CountDownLatch(entrys.size()); + + for (int i = 0; i < numThread; ++i) { + matrixLoadTaskPool.execute(new Runnable() { + int start; + + public Runnable setStart(int start) { + this.start = start; + return this; + } + + @Override + public void run() { + + int workIter = start; + while (workIter < entrys.size()) { + try { + Entry partEntry = entrys.get(workIter); + LOG.info("Load partition " + partEntry.getKey() + + " from path " + matrixPath); + Path partitionFilePath = new Path(matrixPath, + String.valueOf(partEntry.getKey())); + FSDataInputStream input = fs + .open(partitionFilePath); + + // Pass the matrix and partition number field + input.readInt(); + input.readInt(); + partEntry.getValue().load(input); + input.close(); + + // load the part done. + taskCount.countDown(); + workIter += numThread; + } catch (Exception e) { + LOG.fatal("Load partition from path " + matrixPath + + " error:" + e.getMessage()); + } + } + } + + }.setStart(i)); + + } + try{ + matrixLoadTaskPool.shutdown(); + taskCount.await(); + }catch (InterruptedException e) { + LOG.fatal("Load partition failed."); } } @@ -188,11 +242,56 @@ public void writeTo(DataOutputStream output) throws IOException { LOG.debug("writeTo output, matrixId: " + matrixId + ", martitionSize: " + partitionMaps.size()); } - for (Entry entry : partitionMaps.entrySet()) { - LOG.debug("write partitionId: " + entry.getKey()); - output.writeInt(entry.getKey()); - ServerPartition serverPartition = entry.getValue(); - serverPartition.writeTo(output); + final List> entrys = new ArrayList<>(partitionMaps.entrySet()); + + String matrixWriteThread = + attribute.containsKey(MatrixConf.MATRIX_WRITE_THREAD)? + attribute.get(MatrixConf.MATRIX_WRITE_THREAD).trim(): + MatrixConf.DEFAULT_MATRIX_WRITE_THREAD; + int numThread = Integer.parseInt(matrixWriteThread); + ThreadFactory matrixWriteThreadFacotry = + new ThreadFactoryBuilder().setNameFormat("MatrixWriteTask").build(); + ExecutorService matrixWriteTaskPool = Executors.newFixedThreadPool(numThread, matrixWriteThreadFacotry); + final CountDownLatch taskCount = new CountDownLatch(entrys.size()); + + for (int i = 0; i < numThread; ++i) { + matrixWriteTaskPool.execute(new Runnable() { + int start; + + public Runnable setStart(int start) { + this.start = start; + return this; + } + + @Override + public void run() { + + int workIter = start; + while (workIter < entrys.size()) { + try { + Entry partEntry = entrys.get(workIter); + LOG.debug("write partitionId: " + partEntry.getKey()); + output.writeInt(partEntry.getKey()); + ServerPartition serverPartition = partEntry.getValue(); + serverPartition.writeTo(output); + + // write the part done. + taskCount.countDown(); + workIter += numThread; + } catch (Exception e) { + LOG.fatal("write partition error:" + e.getMessage()); + } + } + } + + }.setStart(i)); + + } + try{ + matrixWriteTaskPool.shutdown(); + taskCount.await(); + }catch (InterruptedException e) { + LOG.fatal("Write partition failed."); } }