Skip to content

Commit

Permalink
NIFI-3548: Fixed bug that caused failed requests to not get removed f…
Browse files Browse the repository at this point in the history
…rom 'request map' and also results in that preventing the purging logic, which would then unintentially throw exceptions. This closes apache#1555
  • Loading branch information
markap14 authored and mcgilman committed Mar 3, 2017
1 parent 5990db3 commit 4ed64e7
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ public boolean isOlderThan(final long time, final TimeUnit timeUnit) {
}

@Override
public boolean isComplete() {
return getMergedResponse() != null;
public synchronized boolean isComplete() {
return failure != null || mergedResponse != null || requestsCompleted.get() >= responseMap.size();
}

@Override
Expand All @@ -125,6 +125,10 @@ public NodeResponse getMergedResponse() {

public synchronized NodeResponse getMergedResponse(final boolean triggerCallback) {
if (failure != null) {
if (completedResultFetchedCallback != null) {
completedResultFetchedCallback.run();
}

throw failure;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,36 @@ public static void setupClass() {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/conf/nifi.properties");
}

@Test
public void testFailedRequestsAreCleanedUp() {
withReplicator(replicator -> {
final Set<NodeIdentifier> nodeIds = new HashSet<>();
nodeIds.add(new NodeIdentifier("1", "localhost", 8000, "localhost", 8001, "localhost", 8002, 8003, false));
final URI uri = new URI("http://localhost:8080/processors/1");
final Entity entity = new ProcessorEntity();

// set the user
final Authentication authentication = new NiFiAuthenticationToken(new NiFiUserDetails(StandardNiFiUser.ANONYMOUS));
SecurityContextHolder.getContext().setAuthentication(authentication);

final AsyncClusterResponse response = replicator.replicate(nodeIds, HttpMethod.GET, uri, entity, new HashMap<>(), true, true);

// We should get back the same response object
assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));

assertEquals(HttpMethod.GET, response.getMethod());
assertEquals(nodeIds, response.getNodesInvolved());

assertTrue(response == replicator.getClusterResponse(response.getRequestIdentifier()));

final NodeResponse nodeResponse = response.awaitMergedResponse(3, TimeUnit.SECONDS);
assertEquals(8000, nodeResponse.getNodeId().getApiPort());
assertEquals(ClientResponse.Status.FORBIDDEN.getStatusCode(), nodeResponse.getStatus());

assertNull(replicator.getClusterResponse(response.getRequestIdentifier()));
}, Status.FORBIDDEN, 0L, null);
}

/**
* If we replicate a request, whenever we obtain the merged response from
* the AsyncClusterResponse object, the response should no longer be
Expand Down

0 comments on commit 4ed64e7

Please sign in to comment.