Skip to content

Commit

Permalink
[GH-55] Improve the unsafe code path performance. (#56)
Browse files Browse the repository at this point in the history
We have discovered some places where the IO buffer is not efficiently
used in the `unsafe` code path. There are also some places in which we
could reduce the number of memory copies.  Here are the details:

1.  In `SplashUnsafeSorter.writeSortedFile`, we used a `writerBuffer` to
hold the serialized data.  And then write the content in this buffer to
the output stream.   To avoid the second copy during the write, we
create our own `SplashBufferedOutputStream` which exposes the internal
buffer so that it could be used by the serializer to fill the serialized
data directly.  By doing this, we could also save the memory used by the
original `writerBuffer`.  It could also improve the testability of the
buffer mechanism.  Unit tests are added for `SplashBufferedOutputStream`
to make sure we manage the buffer correctly.

2.  Replace `IOUtils.copy` with `SplashUtils.copy`.  This function
borrows most of the code from `IOUtils.copy`.  The only difference is
that it allows the user to specify the size of the buffer.  In previous
tests, we identified some 4K IO requests.  Those IO requests are issued
by `IOUtils.copy`.  Because this function uses a fixed 4K IO buffer.
This is not efficient nor elastic in a shared file system or distributed
file system.  This buffer now shares the same Spark configuration
`spark.shuffle.file.buffer`.  What's more, since we already have this IO
buffer.  We could use `InputStream` and `OutputStream` directly instead
of the buffered version.  This helps us to save more memory.  Since the
copy procedure is executed in the same thread, we could safely reuse the
same buffer during the copy.  It helps us reduce the GC time.

3.  Add more performance tests.
  • Loading branch information
jealous authored Aug 13, 2019
1 parent b18dd56 commit 40f7905
Show file tree
Hide file tree
Showing 16 changed files with 426 additions and 122 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ script:
- mvn package -Dspark.version=${SPARK}
env:
matrix:
- SPARK=2.4.0
- SPARK=2.3.2
- SPARK=2.4.3
- SPARK=2.3.3
- SPARK=2.1.0
after_success:
- bash <(curl -s https://codecov.io/bash)
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.memverge</groupId>
<artifactId>splash</artifactId>
<version>0.6.2</version>
<version>0.6.3</version>
<name>splash</name>
<description>A shuffle manager that contains a storage interface.</description>
<url>https://github.com/MemVerge/splash/</url>
Expand Down
68 changes: 68 additions & 0 deletions src/main/java/com/memverge/splash/SplashBufferedOutputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2019 MemVerge Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.memverge.splash;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import lombok.val;
import org.apache.spark.unsafe.Platform;

public class SplashBufferedOutputStream extends BufferedOutputStream {

public SplashBufferedOutputStream(OutputStream out) {
this(out, StorageFactoryHolder.getFactory().getFileBufferSize());
}

public SplashBufferedOutputStream(OutputStream out, int size) {
super(out, size);
}

public byte[] getBuffer() {
return buf;
}

public int getBufferSize() {
return buf.length;
}

public int write(byte[] bytes, long offset) throws IOException {
val length = bytes.length - (int) offset;
return write(bytes, Platform.BYTE_ARRAY_OFFSET + offset, length);
}

public int write(Object src, Long srcOffset, int length) throws IOException {
val bufSize = getBufferSize();
int dataRemaining = length;
long offset = srcOffset;
while (dataRemaining > 0) {
val toTransfer = Math.min(bufSize, dataRemaining);
if (count + toTransfer > bufSize && count > 0) {
flush();
}
Platform.copyMemory(
src,
offset,
buf,
Platform.BYTE_ARRAY_OFFSET + count,
toTransfer);
count += toTransfer;
offset += toTransfer;
dataRemaining -= toTransfer;
}
return length - dataRemaining;
}
}
20 changes: 15 additions & 5 deletions src/main/java/com/memverge/splash/TmpShuffleFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*/
package com.memverge.splash;

import java.io.BufferedOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
Expand All @@ -42,16 +41,27 @@ public interface TmpShuffleFile extends ShuffleFile {

OutputStream makeOutputStream();

default BufferedOutputStream makeBufferedOutputStream() {
return new BufferedOutputStream(
makeOutputStream(),
StorageFactoryHolder.getBufferSize());
default int getBufferSize() {
return StorageFactoryHolder.getBufferSize();
}

default SplashBufferedOutputStream makeBufferedOutputStream() {
return makeBufferedOutputStream(getBufferSize());
}

default SplashBufferedOutputStream makeBufferedOutputStream(int bufferSize) {
return new SplashBufferedOutputStream(makeOutputStream(), bufferSize);
}

default DataOutputStream makeBufferedDataOutputStream() {
return new DataOutputStream(makeBufferedOutputStream());
}

default ManualCloseOutputStream makeManualCloseOutputStream(
ShuffleWriteMetrics metrics) {
return new ManualCloseOutputStream(makeOutputStream(), metrics);
}

default ManualCloseOutputStream makeBufferedManualCloseOutputStream(
ShuffleWriteMetrics metrics) {
return new ManualCloseOutputStream(makeBufferedOutputStream(), metrics);
Expand Down
37 changes: 29 additions & 8 deletions src/main/scala/org/apache/spark/shuffle/SplashObjectWriter.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ package org.apache.spark.shuffle

import java.io.OutputStream

import com.memverge.splash.{ShuffleFile, TmpShuffleFile}
import com.memverge.splash.{ShuffleFile, SplashBufferedOutputStream, TmpShuffleFile}
import org.apache.spark.executor.ShuffleWriteMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.serializer.SerializationStream
Expand All @@ -40,9 +40,9 @@ private[spark] class SplashObjectWriter(
noEmptyFile: Boolean = false)
extends OutputStream with Logging {

private lazy val mcs = file.makeBufferedManualCloseOutputStream(writeMetrics)
private lazy val mcs = file.makeManualCloseOutputStream(writeMetrics)

private var bufferedOs: OutputStream = _
private var bufferedOs: SplashBufferedOutputStream = _
private var objOs: SerializationStream = _

val committedPositions: ArrayBuffer[Long] = ArrayBuffer[Long](0)
Expand All @@ -62,14 +62,16 @@ private[spark] class SplashObjectWriter(

private def getObjOut: SerializationStream = {
if (objOs == null) {
objOs = splashSerializer.serializeStream(blockId, mcs)
objOs = splashSerializer.serializeStream(blockId,
new SplashBufferedOutputStream(mcs))
}
objOs
}

private def getBufferedOs: OutputStream = {
private def getBufferedOs: SplashBufferedOutputStream = {
if (bufferedOs == null) {
bufferedOs = splashSerializer.wrap(blockId, mcs)
bufferedOs = new SplashBufferedOutputStream(
splashSerializer.wrap(blockId, mcs))
}
bufferedOs
}
Expand All @@ -84,6 +86,7 @@ private[spark] class SplashObjectWriter(
}
if (bufferedOs != null) {
Utils.tryWithSafeFinally {
bufferedOs.flush()
bufferedOs.close()
} {
bufferedOs = null
Expand All @@ -99,12 +102,18 @@ private[spark] class SplashObjectWriter(
}
}

override def close(): Unit = {
override def close(): Unit = closeAndGet()

def closeAndGet(): Long = {
var committedLen = 0L
Utils.tryWithSafeFinally {
if (notCommittedRecords != 0) commitAndGet()
if (notCommittedRecords != 0) {
committedLen = commitAndGet()
}
} {
closeResources()
}
committedLen
}

def commitAndGet(): Long = {
Expand Down Expand Up @@ -151,5 +160,17 @@ private[spark] class SplashObjectWriter(
recordWritten()
}

def write(src: Object, srcOffset: Long, length: Int): Int = {
val bytesWritten = getBufferedOs.write(src, srcOffset, length)
recordWritten()
bytesWritten
}

override def flush(): Unit = {
if (bufferedOs != null) {
bufferedOs.flush()
}
}

private def recordWritten(): Unit = writeMetrics.incRecordsWritten(1)
}
22 changes: 22 additions & 0 deletions src/main/scala/org/apache/spark/shuffle/SplashOpts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ object SplashOpts {
.createWithDefault(4096)
})

lazy val unsafeSerializeBufferSize: ConfigEntry[Int] =
createIfNotExists("spark.shuffle.unsafe.serializeBufferSize", builder => {
builder
.doc("The size of the buffer used for record serialization in the unsafe code path.")
.intConf
.createWithDefault(1024 * 1024)
})

lazy val memoryMapThreshold: ConfigEntry[Long] =
createIfNotExists("spark.storage.memoryMapThreshold", builder => {
builder.bytesConf(ByteUnit.BYTE).createWithDefaultString("2m")
Expand All @@ -114,6 +122,20 @@ object SplashOpts {
.createWithDefault(200)
})

lazy val maxExeMemory: ConfigEntry[Int] =
createIfNotExists("spark.shuffle.splash.test.maxExeMemory", builder => {
builder.doc("Max execution memory for test memory manager")
.intConf
.createWithDefault(200 * 1024 * 1024)
})

lazy val maxStorageMemory: ConfigEntry[Int] =
createIfNotExists("spark.shuffle.splash.test.maxStorageMemory", builder => {
builder.doc("Max storage memory for test memory manager")
.intConf
.createWithDefault(100 * 1024 * 1024)
})

private def createIfNotExists[T](
optionKey: String,
f: ConfigBuilder => ConfigEntry[T]): ConfigEntry[T] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import java.security.MessageDigest

import com.google.common.annotations.VisibleForTesting
import com.memverge.splash.{ShuffleFile, StorageFactory, StorageFactoryHolder, TmpShuffleFile}
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.StringUtils
import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -196,7 +195,8 @@ class SplashShuffleBlockResolver(appId: String)
getBlockData(blockId) match {
case Some(BlockDataStreamInfo(is, _)) =>
try {
IOUtils.copy(is, os)
val buffer = new Array[Byte](storageFactory.getFileBufferSize)
SplashUtils.copy(is, os, buffer)
logInfo(s"dump $blockId to $dumpFilePath success.")
} finally {
is.close()
Expand Down
16 changes: 16 additions & 0 deletions src/main/scala/org/apache/spark/shuffle/SplashUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
*/
package org.apache.spark.shuffle

import java.io.{InputStream, OutputStream}
import java.util.Comparator

import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -66,6 +67,21 @@ object SplashUtils extends Logging {
def hash[T](obj: T): Int = {
if (obj == null) 0 else obj.hashCode()
}

def copy(input: InputStream, output: OutputStream, buffer: Array[Byte]): Int = {
var count = 0L
var n = input.read(buffer)
val endOfFile = -1
while (endOfFile != n) {
output.write(buffer, 0, n)
count += n
n = input.read(buffer)
}
if (count > Integer.MAX_VALUE) {
return -1
}
count.intValue
}
}


Expand Down
Loading

0 comments on commit 40f7905

Please sign in to comment.