Skip to content

Commit

Permalink
Support tier in remote packet streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
peisun1115 committed Jan 6, 2017
1 parent 32a2b3a commit 1ab5258
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public static BlockOutStream createRemoteBlockOutStream(long blockId, long block

PacketOutStream outStream = PacketOutStream
.createNettyPacketOutStream(context, client.getDataServerAddress(), client.getSessionId(),
blockId, blockSize, Protocol.RequestType.ALLUXIO_BLOCK);
blockId, blockSize, options.getWriteTier(), Protocol.RequestType.ALLUXIO_BLOCK);
closer.register(outStream);
return new BlockOutStream(outStream, blockId, blockSize, client, options);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public final class NettyPacketWriter implements PacketWriter {
private final InetSocketAddress mAddress;
private final long mId;
private final long mSessionId;
private final int mTier;
private final Protocol.RequestType mRequestType;
private final long mLength;

Expand Down Expand Up @@ -109,17 +110,19 @@ public final class NettyPacketWriter implements PacketWriter {
* @param id the block ID or UFS file ID
* @param length the length of the block or file to write, set to Long.MAX_VALUE if unknown
* @param sessionId the session ID
* @param tier the target tier
* @param type the request type (block or UFS file)
* @throws IOException it fails to acquire a netty channel
*/
public NettyPacketWriter(FileSystemContext context, final InetSocketAddress address, long id,
long length, long sessionId, Protocol.RequestType type) throws IOException {
long length, long sessionId, int tier, Protocol.RequestType type) throws IOException {
mContext = context;
mAddress = address;
mSessionId = sessionId;
mId = id;
mRequestType = type;
mLength = length;
mTier = tier;
mChannel = context.acquireNettyChannel(address);

ChannelPipeline pipeline = mChannel.pipeline();
Expand Down Expand Up @@ -175,12 +178,26 @@ public void writePacket(final ByteBuf buf) throws IOException {
mLock.unlock();
}

<<<<<<< 32a2b3ae03da81cb85de4b943a5c51bf86a9214a
Protocol.WriteRequest writeRequest =
Protocol.WriteRequest.newBuilder().setId(mId).setOffset(offset).setSessionId(mSessionId)
.setType(mRequestType).build();
DataBuffer dataBuffer = new DataNettyBufferV2(buf);
mChannel.writeAndFlush(new RPCProtoMessage(writeRequest, dataBuffer))
.addListener(new WriteListener(offset + len));
=======
mChannel.eventLoop().submit(new Runnable() {
@Override
public void run() {
Protocol.WriteRequest writeRequest =
Protocol.WriteRequest.newBuilder().setId(mId).setOffset(offset).setSessionId(mSessionId)
.setTier(mTier).setType(mRequestType).build();
DataBuffer dataBuffer = new DataNettyBufferV2(buf);
mChannel.writeAndFlush(new RPCProtoMessage(writeRequest, dataBuffer))
.addListener(new WriteListener(offset + len));
}
});
>>>>>>> Support tier in remote packet streaming
}

@Override
Expand Down Expand Up @@ -299,7 +316,7 @@ private void sendEOF() {
// Write the last packet.
Protocol.WriteRequest writeRequest =
Protocol.WriteRequest.newBuilder().setId(mId).setOffset(pos).setSessionId(mSessionId)
.setType(mRequestType).build();
.setTier(mTier).setType(mRequestType).build();
mChannel.writeAndFlush(new RPCProtoMessage(writeRequest, null))
.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,16 @@ public static PacketOutStream createLocalPacketOutStream(BlockWorkerClient clien
* @param sessionId the session ID
* @param id the ID (block ID or UFS file ID)
* @param length the block or file length
* @param tier the target tier
* @param type the request type (either block write or UFS file write)
* @return the {@link PacketOutStream} created
* @throws IOException if it fails to create the object
*/
public static PacketOutStream createNettyPacketOutStream(FileSystemContext context,
InetSocketAddress address, long sessionId, long id, long length, Protocol.RequestType type)
throws IOException {
InetSocketAddress address, long sessionId, long id, long length, int tier,
Protocol.RequestType type) throws IOException {
NettyPacketWriter packetWriter =
new NettyPacketWriter(context, address, id, length, sessionId, type);
new NettyPacketWriter(context, address, id, length, sessionId, tier, type);
return new PacketOutStream(packetWriter, length);
}

Expand All @@ -105,7 +106,7 @@ public static PacketOutStream createReplicatedPacketOutStream(FileSystemContext
packetWriters.add(LocalFilePacketWriter.create(client, id, tier));
} else {
packetWriters.add(new NettyPacketWriter(context, client.getDataServerAddress(), id, length,
client.getSessionId(), type));
client.getSessionId(), tier, type));
}
}
return new PacketOutStream(packetWriters, length);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class UnderFileSystemFileOutStream extends FilterOutputStream {
public UnderFileSystemFileOutStream(FileSystemContext context, InetSocketAddress address,
long ufsFileId) throws IOException {
super(PacketOutStream
.createNettyPacketOutStream(context, address, -1, ufsFileId, Long.MAX_VALUE,
.createNettyPacketOutStream(context, address, -1, ufsFileId, Long.MAX_VALUE, -1,
Protocol.RequestType.UFS_FILE));
mOutStream = (PacketOutStream) out;
}
Expand Down
138 changes: 126 additions & 12 deletions core/common/src/main/java/alluxio/proto/dataserver/Protocol.java

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/common/src/proto/dataserver/protocol.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,9 @@ message WriteRequest {
optional int64 id = 2;
optional int64 offset = 3;

// These are only applicable for block write.
optional int64 session_id = 4;
optional int32 tier = 5;
}

// The response.
Expand Down
Loading

0 comments on commit 1ab5258

Please sign in to comment.