Skip to content

Commit

Permalink
Merge pull request Alluxio#4546 from jsimsa/tier
Browse files Browse the repository at this point in the history
[SMALLFIX] Generalizing the write tier index API.
  • Loading branch information
calvinjia authored Jan 4, 2017
2 parents ab6d063 + ad8be0f commit 786c7e2
Show file tree
Hide file tree
Showing 25 changed files with 206 additions and 283 deletions.
79 changes: 0 additions & 79 deletions core/client/src/main/java/alluxio/client/WriteTier.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,12 @@ public interface BlockWorkerClient extends Closeable {
*
* @param blockId the ID of the block
* @param initialBytes the initial size bytes allocated for the block
* @param tier the target tier
* @return the temporary path of the block
* @throws IOException if a non-Alluxio exception occurs
*/
String requestBlockLocation(final long blockId, final long initialBytes) throws IOException;
String requestBlockLocation(final long blockId, final long initialBytes, final int tier)
throws IOException;

/**
* Requests space for some block from worker.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public LocalBlockOutStream(long blockId,
try {
mBlockWorkerClient = mCloser.register(context.createBlockWorkerClient(workerNetAddress));
long initialSize = Configuration.getBytes(PropertyKey.USER_FILE_BUFFER_BYTES);
String blockPath = mBlockWorkerClient.requestBlockLocation(mBlockId, initialSize);
String blockPath =
mBlockWorkerClient.requestBlockLocation(mBlockId, initialSize, options.getWriteTier());
mReservedBytes += initialSize;
mWriter = new LocalFileBlockWriter(blockPath);
mCloser.register(mWriter);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,13 @@
import alluxio.Constants;
import alluxio.PropertyKey;
import alluxio.RuntimeConstants;
import alluxio.client.WriteTier;
import alluxio.exception.AlluxioException;
import alluxio.exception.ExceptionMessage;
import alluxio.exception.WorkerOutOfSpaceException;
import alluxio.metrics.MetricsSystem;
import alluxio.thrift.AlluxioTException;
import alluxio.thrift.BlockWorkerClientService;
import alluxio.thrift.ThriftIOException;
import alluxio.thrift.TWriteTier;
import alluxio.util.ThreadFactoryUtils;
import alluxio.util.network.NetworkAddressUtils;
import alluxio.wire.LockBlockResult;
Expand Down Expand Up @@ -67,8 +65,6 @@ public final class RetryHandlingBlockWorkerClient
ThreadFactoryUtils.build("block-worker-heartbeat-cancel-%d", true));
private final BlockWorkerThriftClientPool mClientPool;
private final BlockWorkerThriftClientPool mClientHeartbeatPool;
private static final TWriteTier WRITE_TIER = WriteTier.toThrift(
alluxio.Configuration.getEnum(PropertyKey.USER_FILE_WRITE_TIER_DEFAULT, WriteTier.class));
// Tracks the number of active heartbeat close requests.
private static final AtomicInteger NUM_ACTIVE_SESSIONS = new AtomicInteger(0);

Expand Down Expand Up @@ -249,15 +245,15 @@ public Void call(BlockWorkerClientService.Client client)
}

@Override
public String requestBlockLocation(final long blockId, final long initialBytes)
throws IOException {
public String requestBlockLocation(final long blockId, final long initialBytes,
final int writeTier) throws IOException {
try {
return retryRPC(
new RpcCallableThrowsAlluxioTException<String, BlockWorkerClientService.Client>() {
@Override
public String call(BlockWorkerClientService.Client client)
throws AlluxioTException, TException {
return client.requestBlockLocation(getSessionId(), blockId, initialBytes, WRITE_TIER);
return client.requestBlockLocation(getSessionId(), blockId, initialBytes, writeTier);
}
});
} catch (WorkerOutOfSpaceException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ public static BlockOutStream createLocalBlockOutStream(long blockId, long blockS
Closer closer = Closer.create();
try {
BlockWorkerClient client = closer.register(context.createBlockWorkerClient(workerNetAddress));
PacketOutStream outStream =
PacketOutStream.createLocalPacketOutStream(client, blockId, blockSize);
PacketOutStream outStream = PacketOutStream
.createLocalPacketOutStream(client, blockId, blockSize, options.getWriteTier());
closer.register(outStream);
return new BlockOutStream(outStream, blockId, blockSize, client, options);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,13 @@ public final class LocalFilePacketWriter implements PacketWriter {
*
* @param blockWorkerClient the block worker client, not owned by this class
* @param blockId the block ID
* @param tier the target tier
* @throws IOException if it fails to create the packet writer
* @return the {@link LocalFilePacketWriter} created
*/
public static LocalFilePacketWriter create(BlockWorkerClient blockWorkerClient,
long blockId) throws IOException {
return new LocalFilePacketWriter(blockWorkerClient, blockId);
long blockId, int tier) throws IOException {
return new LocalFilePacketWriter(blockWorkerClient, blockId, tier);
}

@Override
Expand Down Expand Up @@ -104,11 +105,13 @@ public void close() throws IOException {
*
* @param blockWorkerClient the block worker client, not owned by this class
* @param blockId the block ID
* @param tier the target tier
* @throws IOException if it fails to create the packet writer
*/
private LocalFilePacketWriter(BlockWorkerClient blockWorkerClient, long blockId)
private LocalFilePacketWriter(BlockWorkerClient blockWorkerClient, long blockId, int tier)
throws IOException {
String blockPath = blockWorkerClient.requestBlockLocation(blockId, FILE_BUFFER_BYTES);
String blockPath =
blockWorkerClient.requestBlockLocation(blockId, FILE_BUFFER_BYTES, tier);
mWriter = new LocalFileBlockWriter(blockPath);
mPosReserved += FILE_BUFFER_BYTES;
mBlockId = blockId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@ public final class PacketOutStream extends OutputStream implements BoundedStream
* @param client the block worker client
* @param id the ID
* @param length the block or file length
* @param tier the target tier
* @return the {@link PacketOutStream} created
* @throws IOException if it fails to create the object
*/
public static PacketOutStream createLocalPacketOutStream(BlockWorkerClient client,
long id, long length) throws IOException {
PacketWriter packetWriter = LocalFilePacketWriter.create(client, id);
long id, long length, int tier) throws IOException {
PacketWriter packetWriter = LocalFilePacketWriter.create(client, id, tier);
return new PacketOutStream(packetWriter, length);
}

Expand Down Expand Up @@ -88,19 +89,20 @@ public static PacketOutStream createNettyPacketOutStream(FileSystemContext conte
* @param clients a list of block worker clients
* @param id the ID (block ID or UFS file ID)
* @param length the block or file length
* @param tier the target tier
* @param type the request type (either block write or UFS file write)
* @return the {@link PacketOutStream} created
* @throws IOException if it fails to create the object
*/
public static PacketOutStream createReplicatedPacketOutStream(FileSystemContext context,
List<BlockWorkerClient> clients, long id, long length,
List<BlockWorkerClient> clients, long id, long length, int tier,
Protocol.RequestType type) throws IOException {
String localHost = NetworkAddressUtils.getLocalHostName();

List<PacketWriter> packetWriters = new ArrayList<>();
for (BlockWorkerClient client : clients) {
if (client.getWorkerNetAddress().getHost().equals(localHost)) {
packetWriters.add(LocalFilePacketWriter.create(client, id));
packetWriters.add(LocalFilePacketWriter.create(client, id, tier));
} else {
packetWriters.add(new NettyPacketWriter(context, client.getDataServerAddress(), id, length,
client.getSessionId(), type));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class CreateFileOptions {
private long mTtl;
private TtlAction mTtlAction;
private Mode mMode; // null if creating the file using system default mode
private int mWriteTier;
private WriteType mWriteType;

/**
Expand All @@ -62,6 +63,7 @@ private CreateFileOptions() {
} catch (Exception e) {
throw Throwables.propagate(e);
}
mWriteTier = Configuration.getInt(PropertyKey.USER_FILE_WRITE_TIER_DEFAULT);
mWriteType = Configuration.getEnum(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class);
mTtl = Constants.NO_TTL;
mTtlAction = TtlAction.DELETE;
Expand Down Expand Up @@ -112,6 +114,13 @@ public Mode getMode() {
return mMode;
}

/**
* @return the write tier
*/
public int getWriteTier() {
return mWriteTier;
}

/**
* @return the write type
*/
Expand Down Expand Up @@ -199,6 +208,15 @@ public CreateFileOptions setTtlAction(TtlAction ttlAction) {
return this;
}

/**
* @param writeTier the write tier to use for this operation
* @return the updated options object
*/
public CreateFileOptions setWriteTier(int writeTier) {
mWriteTier = writeTier;
return this;
}

/**
* @param writeType the {@link WriteType} to use for this operation. This will override both the
* {@link AlluxioStorageType} and {@link UnderStorageType}.
Expand All @@ -219,6 +237,7 @@ public OutStreamOptions toOutStreamOptions() {
.setMode(mMode)
.setTtl(mTtl)
.setTtlAction(mTtlAction)
.setWriteTier(mWriteTier)
.setWriteType(mWriteType);
}

Expand All @@ -237,13 +256,14 @@ public boolean equals(Object o) {
&& Objects.equal(mMode, that.mMode)
&& Objects.equal(mTtl, that.mTtl)
&& Objects.equal(mTtlAction, that.mTtlAction)
&& mWriteTier == that.mWriteTier
&& Objects.equal(mWriteType, that.mWriteType);
}

@Override
public int hashCode() {
return Objects.hashCode(mRecursive, mBlockSizeBytes, mLocationPolicy, mMode, mTtl,
mTtlAction, mWriteType);
mTtlAction, mWriteTier, mWriteType);
}

@Override
Expand All @@ -255,6 +275,7 @@ public String toString() {
.add("mode", mMode)
.add("ttl", mTtl)
.add("ttlAction", mTtlAction)
.add("writeTier", mWriteTier)
.add("writeType", mWriteType)
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public final class OutStreamOptions {
private long mTtl;
private TtlAction mTtlAction;
private FileWriteLocationPolicy mLocationPolicy;
private int mWriteTier;
private WriteType mWriteType;
private Permission mPermission;
private String mUfsPath;
Expand All @@ -64,6 +65,7 @@ private OutStreamOptions() {
} catch (Exception e) {
throw Throwables.propagate(e);
}
mWriteTier = Configuration.getInt(PropertyKey.USER_FILE_WRITE_TIER_DEFAULT);
mWriteType = Configuration.getEnum(PropertyKey.USER_FILE_WRITE_TYPE_DEFAULT, WriteType.class);
mPermission = Permission.defaults();
try {
Expand Down Expand Up @@ -131,6 +133,13 @@ public String getUfsPath() {
return mUfsPath;
}

/**
* @return the write tier
*/
public int getWriteTier() {
return mWriteTier;
}

/**
* @return the write type
*/
Expand Down Expand Up @@ -180,6 +189,17 @@ public OutStreamOptions setLocationPolicy(FileWriteLocationPolicy locationPolicy
return this;
}

/**
* Sets the write tier.
*
* @param writeTier the write tier to use for this operation
* @return the updated options object
*/
public OutStreamOptions setWriteTier(int writeTier) {
mWriteTier = writeTier;
return this;
}

/**
* Sets the {@link WriteType}.
*
Expand Down Expand Up @@ -239,6 +259,7 @@ public boolean equals(Object o) {
&& Objects.equal(mTtl, that.mTtl)
&& Objects.equal(mTtlAction, that.mTtlAction)
&& Objects.equal(mLocationPolicy, that.mLocationPolicy)
&& mWriteTier == that.mWriteTier
&& Objects.equal(mWriteType, that.mWriteType)
&& Objects.equal(mUfsPath, that.mUfsPath)
&& Objects.equal(mPermission, that.mPermission);
Expand All @@ -250,6 +271,7 @@ public int hashCode() {
mTtl,
mTtlAction,
mLocationPolicy,
mWriteTier,
mWriteType,
mUfsPath,
mPermission);
Expand All @@ -260,8 +282,9 @@ public String toString() {
return Objects.toStringHelper(this)
.add("blockSizeBytes", mBlockSizeBytes)
.add("ttl", mTtl)
.add("mTtlAction", mTtlAction)
.add("ttlAction", mTtlAction)
.add("locationPolicy", mLocationPolicy)
.add("writeTier", mWriteTier)
.add("writeType", mWriteType)
.add("permission", mPermission)
.add("ufsPath", mUfsPath)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ public void getOutStreamMissingLocationPolicy() throws IOException {
@Test
public void getOutStreamLocal() throws Exception {
File tmp = mTestFolder.newFile();
Mockito.when(mBlockWorkerClient.requestBlockLocation(Matchers.eq(BLOCK_ID), Matchers.anyLong()))
Mockito.when(mBlockWorkerClient
.requestBlockLocation(Matchers.eq(BLOCK_ID), Matchers.anyLong(), Matchers.anyInt()))
.thenReturn(tmp.getAbsolutePath());

OutStreamOptions options = OutStreamOptions.defaults().setBlockSizeBytes(BLOCK_LENGTH)
Expand Down
Loading

0 comments on commit 786c7e2

Please sign in to comment.