Skip to content

Commit

Permalink
YARN-5609. Expose upgrade and restart API in ContainerManagementProto…
Browse files Browse the repository at this point in the history
…col. Contributed by Arun Suresh
  • Loading branch information
jian-he committed Sep 26, 2016
1 parent 14a696f commit fe644ba
Show file tree
Hide file tree
Showing 20 changed files with 732 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
Expand Down Expand Up @@ -476,5 +481,30 @@ public ResourceLocalizationResponse localize(
ResourceLocalizationRequest request) throws YarnException, IOException {
return null;
}

@Override
public ReInitializeContainerResponse reInitializeContainer(
ReInitializeContainerRequest request) throws YarnException,
IOException {
return null;
}

@Override
public RestartContainerResponse restartContainer(ContainerId containerId)
throws YarnException, IOException {
return null;
}

@Override
public RollbackResponse rollbackLastReInitialization(
ContainerId containerId) throws YarnException, IOException {
return null;
}

@Override
public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher.EventType;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
Expand Down Expand Up @@ -481,6 +486,31 @@ public ResourceLocalizationResponse localize(
ResourceLocalizationRequest request) throws YarnException, IOException {
return null;
}

@Override
public ReInitializeContainerResponse reInitializeContainer(
ReInitializeContainerRequest request) throws YarnException,
IOException {
return null;
}

@Override
public RestartContainerResponse restartContainer(ContainerId containerId)
throws YarnException, IOException {
return null;
}

@Override
public RollbackResponse rollbackLastReInitialization(
ContainerId containerId) throws YarnException, IOException {
return null;
}

@Override
public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
return null;
}
}

@SuppressWarnings("serial")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,17 @@
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Stable;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
Expand Down Expand Up @@ -215,4 +220,53 @@ SignalContainerResponse signalToContainer(SignalContainerRequest request)
@Unstable
ResourceLocalizationResponse localize(ResourceLocalizationRequest request)
throws YarnException, IOException;

/**
* ReInitialize the Container with a new Launch Context.
* @param request Specify the new ContainerLaunchContext.
* @return Response that the ReInitialize request is accepted.
* @throws YarnException Exception specific to YARN.
* @throws IOException IOException thrown from the RPC layer.
*/
@Public
@Unstable
ReInitializeContainerResponse reInitializeContainer(
ReInitializeContainerRequest request) throws YarnException, IOException;

/**
* Restart the container.
* @param containerId Container Id.
* @return Response that the restart request is accepted.
* @throws YarnException Exception specific to YARN.
* @throws IOException IOException thrown from the RPC layer.
*/
@Public
@Unstable
RestartContainerResponse restartContainer(ContainerId containerId)
throws YarnException, IOException;

/**
* Rollback the Last ReInitialization if possible.
* @param containerId Container Id.
* @return Response that the rollback request is accepted.
* @throws YarnException Exception specific to YARN.
* @throws IOException IOException thrown from the RPC layer.
*/
@Public
@Unstable
RollbackResponse rollbackLastReInitialization(ContainerId containerId)
throws YarnException, IOException;

/**
* Commit the Last ReInitialization if possible. Once the reinitialization
* has been committed, It cannot be rolled back.
* @param containerId Container Id.
* @return Response that the commit request is accepted.
* @throws YarnException Exception specific to YARN.
* @throws IOException IOException thrown from the RPC layer.
*/
@Public
@Unstable
CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.yarn;

import "yarn_protos.proto";
import "yarn_service_protos.proto";

service ContainerManagementProtocolService {
Expand All @@ -37,4 +38,9 @@ service ContainerManagementProtocolService {
rpc increaseContainersResource(IncreaseContainersResourceRequestProto) returns (IncreaseContainersResourceResponseProto);
rpc signalToContainer(SignalContainerRequestProto) returns (SignalContainerResponseProto);
rpc localize(ResourceLocalizationRequestProto) returns (ResourceLocalizationResponseProto);

rpc reInitializeContainer(ReInitializeContainerRequestProto) returns (ReInitializeContainerResponseProto);
rpc restartContainer(ContainerIdProto) returns (RestartContainerResponseProto);
rpc rollbackLastReInitialization(ContainerIdProto) returns (RollbackResponseProto);
rpc commitLastReInitialization(ContainerIdProto) returns (CommitResponseProto);
}
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,24 @@ message ResourceLocalizationRequestProto {
message ResourceLocalizationResponseProto {
}

message ReInitializeContainerRequestProto {
optional ContainerIdProto container_id = 1;
optional ContainerLaunchContextProto container_launch_context = 2;
optional bool auto_commit = 3 [default = true];
}

message ReInitializeContainerResponseProto {
}

message RestartContainerResponseProto {
}

message RollbackResponseProto {
}

message CommitResponseProto {
}

//// bulk API records
message StartContainersRequestProto {
repeated StartContainerRequestProto start_container_request = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,33 +27,50 @@
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
import org.apache.hadoop.yarn.api.protocolrecords.CommitResponse;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesResponse;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
import org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReInitializeContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceLocalizationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RestartContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.RollbackResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StopContainersResponse;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.CommitResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.GetContainerStatusesResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.IncreaseContainersResourceResponsePBImpl;

import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ReInitializeContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.ResourceLocalizationResponsePBImpl;

import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RestartContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb
.RollbackResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StartContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestPBImpl;
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.ipc.RPCUtil;
import org.apache.hadoop.yarn.proto.YarnProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.GetContainerStatusesRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.IncreaseContainersResourceRequestProto;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.ResourceLocalizationRequestProto;
Expand Down Expand Up @@ -185,4 +202,60 @@ public ResourceLocalizationResponse localize(
return null;
}
}

@Override
public ReInitializeContainerResponse reInitializeContainer(
ReInitializeContainerRequest request) throws YarnException, IOException {
YarnServiceProtos.ReInitializeContainerRequestProto requestProto =
((ReInitializeContainerRequestPBImpl) request).getProto();
try {
return new ReInitializeContainerResponsePBImpl(
proxy.reInitializeContainer(null, requestProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}

@Override
public RestartContainerResponse restartContainer(ContainerId containerId)
throws YarnException, IOException {
YarnProtos.ContainerIdProto containerIdProto = ProtoUtils
.convertToProtoFormat(containerId);
try {
return new RestartContainerResponsePBImpl(
proxy.restartContainer(null, containerIdProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}

@Override
public RollbackResponse rollbackLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
YarnProtos.ContainerIdProto containerIdProto = ProtoUtils
.convertToProtoFormat(containerId);
try {
return new RollbackResponsePBImpl(
proxy.rollbackLastReInitialization(null, containerIdProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}

@Override
public CommitResponse commitLastReInitialization(ContainerId containerId)
throws YarnException, IOException {
YarnProtos.ContainerIdProto containerIdProto = ProtoUtils
.convertToProtoFormat(containerId);
try {
return new CommitResponsePBImpl(
proxy.commitLastReInitialization(null, containerIdProto));
} catch (ServiceException e) {
RPCUtil.unwrapAndThrowException(e);
return null;
}
}
}
Loading

0 comments on commit fe644ba

Please sign in to comment.