Skip to content

Commit

Permalink
Revert "Samza-2330: Handle expired resource request for Container all…
Browse files Browse the repository at this point in the history
…ocator when host affinity is disabled"

This reverts commit 2ae3450.
  • Loading branch information
Sanil15 authored and mynameborat committed Oct 25, 2019
1 parent c5af0af commit d99a870
Show file tree
Hide file tree
Showing 5 changed files with 18 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ Samza supports both standalone and clustered ([YARN](yarn-jobs.html)) [deploymen
|cluster-manager.container.fail.job.after.retries|true|This configuration sets the behavior of the job after all `cluster-manager.container.retry.count`s are exhausted and each retry is within the `cluster-manager.container.retry.window.ms` period on any single container. If set to true, the whole job will fail if any container fails after the last retry. If set to false, the job will continue to run without the failed container. The typical use cases of setting this to false is to aid in debugging the cluster manager when containers fail unexpectedly and also to allow other healthy containers to continue to run so that lag does not accumulate across all containers. Samza job operators should diligent in monitoring the `job-healthy` and `failed-containers` metrics when setting this configuration to false. A full restart of the job is required if another attempt to restart the container is needed after the container failure.|
|cluster-manager.jobcoordinator.jmx.enabled|true|This is deprecated in favor of `job.jmx.enabled`|
|cluster-manager.allocator.sleep.ms|3600|The container allocator thread is responsible for matching requests to allocated containers. The sleep interval for this thread is configured using this property.|
|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. If no resource is obtained after cluster-manager.container.request.timeout.ms the request is declared to be expired.. When a request expires, it gets allocated to any available container that was returned by the cluster manager if none is available the existing resource request is cancelled and a new ANY-HOST resource request is issued. This behavior holds regardless of host-affinity enabled or not.|
|cluster-manager.container.request.timeout.ms|5000|The allocator thread periodically checks the state of the container requests and allocated containers to determine the assignment of a container to an allocated resource. This property determines the number of milliseconds before a container request is considered to have expired / timed-out. When a request expires, it gets allocated to any available container that was returned by the cluster manager.|
|task.execute|bin/run-container.sh|The command that starts a Samza container. The script must be included in the [job package](./packaging.html). There is usually no need to customize this.|
|task.java.home| |The JAVA_HOME path for Samza containers. By setting this property, you can use a java version that is different from your cluster's java version. Remember to set the `yarn.am.java.home` as well.|
|yarn.am.container.<br>memory.mb|1024|Each Samza job when running in Yarn has one special container, the [ApplicationMaster](../yarn/application-master.html) (AM), which manages the execution of the job. This property determines how much memory, in megabytes, to request from YARN for running the ApplicationMaster.|
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@
* When host-affinity is disabled, the resource-request's preferredHost param is set to {@link ResourceRequestState#ANY_HOST}
* </li>
* <li>
* When the preferred resource has not been obtained after {@code requestExpiryTimeout} milliseconds of the request
* being made, the resource is declared expired. Expired request are handled by allocating them to *ANY*
* allocated resource if available. If no surplus resources are available the current preferred resource-request
* is cancelled and resource-request for ANY_HOST is issued
* When host-affinity is enabled and a preferred resource has not been obtained after {@code requestExpiryTimeout}
* milliseconds of the request being made, the resource is declared expired. The expired request are handled by
* allocating them to *ANY* allocated resource if available. If no surplus resources are available the current preferred
* resource-request is cancelled and resource-request for ANY_HOST is issued
* </li>
* <li>
* When host-affinity is not enabled, this periodically wakes up to assign a processor to *ANY* allocated resource.
Expand Down Expand Up @@ -219,7 +219,9 @@ void assignResourceRequests() {

if (expired) {
updateExpiryMetrics(request);
handleExpiredRequest(processorId, preferredHost, request);
if (hostAffinityEnabled) {
handleExpiredRequestWithHostAffinityEnabled(processorId, preferredHost, request);
}
} else {
LOG.info("Request for Processor ID: {} on preferred host {} has not expired yet."
+ "Request creation time: {}. Current Time: {}. Request timeout: {} ms", processorId, preferredHost,
Expand All @@ -233,10 +235,10 @@ void assignResourceRequests() {
/**
* Handles an expired resource request for both active and standby containers. Since a preferred host cannot be obtained
* this method checks the availability of surplus ANY_HOST resources and launches the container if available. Otherwise
* issues an ANY_HOST request. This behavior holds regardless of host-affinity enabled or not.
* issues an ANY_HOST request.
*/
@VisibleForTesting
void handleExpiredRequest(String processorId, String preferredHost,
void handleExpiredRequestWithHostAffinityEnabled(String processorId, String preferredHost,
SamzaResourceRequest request) {
boolean resourceAvailableOnAnyHost = hasAllocatedResource(ResourceRequestState.ANY_HOST);
if (standbyContainerManager.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,9 +425,9 @@ public void testExpiredRequestAllocationOnAnyHost() throws Exception {
// Verify that all the request that were created as preferred host requests expired
assertTrue(state.preferredHostRequests.get() == 2);
assertTrue(state.expiredPreferredHostRequests.get() == 2);
verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class));
verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
any(SamzaResourceRequest.class));

// Verify that preferred host request were cancelled and since no surplus resources were available
Expand Down Expand Up @@ -469,10 +469,10 @@ public void testExpiredRequestAllocationOnSurplusAnyHostWithRunStreamProcessor()
Thread.sleep(100);

// Verify that all the request that were created as preferred host requests expired
assertEquals(state.expiredPreferredHostRequests.get(), 2);
verify(spyAllocator, times(1)).handleExpiredRequest(eq("0"), eq("hostname-0"),
assertTrue(state.expiredPreferredHostRequests.get() == 2);
verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("0"), eq("hostname-0"),
any(SamzaResourceRequest.class));
verify(spyAllocator, times(1)).handleExpiredRequest(eq("1"), eq("hostname-1"),
verify(spyAllocator, times(1)).handleExpiredRequestWithHostAffinityEnabled(eq("1"), eq("hostname-1"),
any(SamzaResourceRequest.class));

// Verify that runStreamProcessor was invoked with already available ANY_HOST requests
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.samza.config.Config;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.JobModelManager;
Expand Down Expand Up @@ -88,7 +87,6 @@ private static Config getConfig() {
put("cluster-manager.container.count", "1");
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
put("cluster-manager.container.request.timeout.ms", "3");
put("cluster-manager.allocator.sleep.ms", "10");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
Expand Down Expand Up @@ -284,61 +282,12 @@ public void testRequestAllocationWithRunStreamProcessor() throws Exception {
resourceRequestCaptor.getAllValues()
.forEach(resourceRequest -> assertEquals(resourceRequest.getPreferredHost(), ResourceRequestState.ANY_HOST));
assertTrue(state.anyHostRequests.get() == containersToHostMapping.size());
// Expiry currently should not be invoked
verify(spyAllocator, never()).handleExpiredRequest(anyString(), anyString(),
// Expiry currently is only handled for host affinity enabled cases
verify(spyAllocator, never()).handleExpiredRequestWithHostAffinityEnabled(anyString(), anyString(),
any(SamzaResourceRequest.class));
// Only updated when host affinity is enabled
assertTrue(state.matchedResourceRequests.get() == 0);
assertTrue(state.preferredHostRequests.get() == 0);
spyAllocator.stop();
}

@Test
public void testExpiredRequestAllocationOnAnyHost() throws Exception {
MockClusterResourceManager spyManager = spy(new MockClusterResourceManager(callback, state));
spyAllocator = Mockito.spy(
new ContainerAllocator(spyManager, config, state, false, Optional.empty()));

// Request Resources
spyAllocator.requestResources(new HashMap<String, String>() {
{
put("0", "host-0");
put("1", "host-1");
}
});

spyThread = new Thread(spyAllocator);
// Start the container allocator thread periodic assignment
spyThread.start();

// Let the request expire, expiration timeout is 3 ms
Thread.sleep(100);

// Verify that all the request that were created as ANY_HOST host
// and all created requests expired
assertEquals(state.preferredHostRequests.get(), 0);
// Atleast 2 requests should expire & 2 ANY_HOST requests should be generated
assertTrue(state.anyHostRequests.get() >= 4);
assertTrue(state.expiredAnyHostRequests.get() >= 2);

verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("0"), eq(ResourceRequestState.ANY_HOST),
any(SamzaResourceRequest.class));
verify(spyAllocator, atLeastOnce()).handleExpiredRequest(eq("1"), eq(ResourceRequestState.ANY_HOST),
any(SamzaResourceRequest.class));

// Verify that preferred host request were cancelled and since no surplus resources were available
// requestResource was invoked with ANY_HOST requests
ArgumentCaptor<SamzaResourceRequest> cancelledRequestCaptor = ArgumentCaptor.forClass(SamzaResourceRequest.class);
// At least 2 preferred host requests were cancelled
verify(spyManager, atLeast(2)).cancelResourceRequest(cancelledRequestCaptor.capture());
// Verify all the request cancelled were ANY_HOST
assertTrue(cancelledRequestCaptor.getAllValues()
.stream()
.map(resourceRequest -> resourceRequest.getPreferredHost())
.collect(Collectors.toSet())
.size() == 1);
containerAllocator.stop();

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class TestContainerProcessManager {
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
put("cluster-manager.allocator.sleep.ms", "1");
put("cluster-manager.container.request.timeout.ms", "100");
put("cluster-manager.container.request.timeout.ms", "2");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
Expand Down

0 comments on commit d99a870

Please sign in to comment.