Skip to content

Commit

Permalink
Address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Jan 12, 2017
1 parent 3c6d032 commit 7aaea8e
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@
* A special version of {@link EmbeddedChannel} that doesn't fail on exception so that we can
* still check result after the channel is closed.
*/
public final class EmbeddedChannelNoException extends EmbeddedChannel {
public final class EmbeddedNoExceptionChannel extends EmbeddedChannel {
/**
* @param handlers the handlers
*/
public EmbeddedChannelNoException(ChannelHandler... handlers) {
public EmbeddedNoExceptionChannel(ChannelHandler... handlers) {
super(handlers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package alluxio.worker.netty;

import alluxio.EmbeddedChannelNoException;
import alluxio.EmbeddedNoExceptionChannel;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol;
import alluxio.worker.block.BlockWorker;
Expand Down Expand Up @@ -41,11 +41,14 @@ public void before() throws Exception {
mChannel = new EmbeddedChannel(
new DataServerBlockReadHandler(NettyExecutors.BLOCK_READER_EXECUTOR, mBlockWorker,
FileTransferType.MAPPED));
mChannelNoException = new EmbeddedChannelNoException(
mChannelNoException = new EmbeddedNoExceptionChannel(
new DataServerBlockReadHandler(NettyExecutors.BLOCK_READER_EXECUTOR, mBlockWorker,
FileTransferType.MAPPED));
}

/**
* Tests the {@link FileTransferType#TRANSFER} type.
*/
@Test
public void transferType() throws Exception {
mChannel = new EmbeddedChannel(
Expand All @@ -66,6 +69,9 @@ public void transferType() throws Exception {
mBlockReader.close();
}

/**
* Tests read failure.
*/
@Test
public void readFailure() throws Exception {
long fileSize = PACKET_SIZE * 10 + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package alluxio.worker.netty;

import alluxio.EmbeddedChannelNoException;
import alluxio.EmbeddedNoExceptionChannel;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
Expand Down Expand Up @@ -53,10 +53,13 @@ public void before() throws Exception {

mChannel = new EmbeddedChannel(
new DataServerBlockWriteHandler(NettyExecutors.BLOCK_WRITER_EXECUTOR, mBlockWorker));
mChannelNoException = new EmbeddedChannelNoException(
mChannelNoException = new EmbeddedNoExceptionChannel(
new DataServerBlockWriteHandler(NettyExecutors.BLOCK_WRITER_EXECUTOR, mBlockWorker));
}

/**
* Tests write failure.
*/
@Test
public void writeFailure() throws Exception {
mChannelNoException.writeInbound(buildWriteRequest(0, PACKET_SIZE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,19 @@ public abstract class DataServerReadHandlerTest {
@Rule
public TemporaryFolder mTestFolder = new TemporaryFolder();

/**
* Reads all bytes of a file.
*/
@Test
public void readFullFile() throws Exception {
long checksumExpected = populateInputFile(PACKET_SIZE * 10, 0, PACKET_SIZE * 10 - 1);
mChannel.writeInbound(buildReadRequest(0, PACKET_SIZE * 10));
checkAllReadResponses(mChannel, checksumExpected);
}

/**
* Reads a sub-region of a file.
*/
@Test
public void readPartialFile() throws Exception {
long start = 3;
Expand All @@ -64,6 +70,9 @@ public void readPartialFile() throws Exception {
checkAllReadResponses(mChannel, checksumExpected);
}

/**
* Handles multiple read requests within a channel sequentially.
*/
@Test
public void reuseChannel() throws Exception {
long fileSize = PACKET_SIZE * 5;
Expand All @@ -79,6 +88,9 @@ public void reuseChannel() throws Exception {
checkAllReadResponses(mChannel, checksumExpected);
}

/**
* Fails if the read request tries to read an empty file.
*/
@Test
public void readEmptyFile() throws Exception {
populateInputFile(0, 0, 0);
Expand All @@ -87,6 +99,9 @@ public void readEmptyFile() throws Exception {
checkReadResponse(response, Protocol.Status.Code.INVALID_ARGUMENT);
}

/**
* Cancels the read request immediately after the read request is sent.
*/
@Test
public void cancelRequest() throws Exception {
long fileSize = PACKET_SIZE * 10 + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package alluxio.worker.netty;

import alluxio.EmbeddedChannelNoException;
import alluxio.EmbeddedNoExceptionChannel;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.proto.dataserver.Protocol;
import alluxio.worker.file.FileSystemWorker;
Expand All @@ -36,7 +36,7 @@ public void before() throws Exception {
mFileSystemWorker = Mockito.mock(FileSystemWorker.class);
mChannel = new EmbeddedChannel(
new DataServerUFSFileReadHandler(NettyExecutors.FILE_READER_EXECUTOR, mFileSystemWorker));
mChannelNoException = new EmbeddedChannelNoException(
mChannelNoException = new EmbeddedNoExceptionChannel(
new DataServerUFSFileReadHandler(NettyExecutors.FILE_READER_EXECUTOR, mFileSystemWorker));
}

Expand All @@ -45,6 +45,9 @@ public void after() throws Exception {
mInputStream.close();
}

/**
* Tests read failure.
*/
@Test
public void readFailure() throws Exception {
long fileSize = PACKET_SIZE * 10 + 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

package alluxio.worker.netty;

import alluxio.EmbeddedChannelNoException;
import alluxio.EmbeddedNoExceptionChannel;
import alluxio.network.protocol.RPCProtoMessage;
import alluxio.network.protocol.databuffer.DataBuffer;
import alluxio.network.protocol.databuffer.DataNettyBufferV2;
Expand Down Expand Up @@ -47,7 +47,7 @@ public void before() throws Exception {
mChecksum = 0;
mChannel = new EmbeddedChannel(
new DataServerUFSFileWriteHandler(NettyExecutors.FILE_WRITER_EXECUTOR, mFileSystemWorker));
mChannelNoException = new EmbeddedChannelNoException(
mChannelNoException = new EmbeddedNoExceptionChannel(
new DataServerUFSFileWriteHandler(NettyExecutors.FILE_WRITER_EXECUTOR, mFileSystemWorker));
}

Expand All @@ -56,6 +56,9 @@ public void after() throws Exception {
mOutputStream.close();
}

/**
* Tests write failure.
*/
@Test
public void writeFailure() throws Exception {
mChannelNoException.writeInbound(buildWriteRequest(0, PACKET_SIZE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ public abstract class DataServerWriteHandlerTest {
@Rule
public TemporaryFolder mTestFolder = new TemporaryFolder();

/**
* Writes an empty file.
*/
@Test
public void writeEmptyFile() throws Exception {
mChannel.writeInbound(buildWriteRequest(0, 0));
Expand All @@ -49,6 +52,9 @@ public void writeEmptyFile() throws Exception {
checkWriteResponse(writeResponse, Protocol.Status.Code.OK);
}

/**
* Writes an non-empty file.
*/
@Test
public void writeNonEmptyFile() throws Exception {
long len = 0;
Expand All @@ -64,6 +70,9 @@ public void writeNonEmptyFile() throws Exception {
checkFileContent(len);
}

/**
* Fails if the write request contains an invalid offset.
*/
@Test
public void writeInvalidOffset() throws Exception {
mChannelNoException.writeInbound(buildWriteRequest(0, PACKET_SIZE));
Expand Down

0 comments on commit 7aaea8e

Please sign in to comment.