Skip to content

Commit

Permalink
[TEST] test didn't take into account other cluster service tasks
Browse files Browse the repository at this point in the history
The pending tests on an actual node should take into account that other tasks might be executing on that node, thus failing when it happens
  • Loading branch information
kimchy committed Jul 6, 2014
1 parent 045ce09 commit 6e99448
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions src/test/java/org/elasticsearch/cluster/ClusterServiceTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.elasticsearch.cluster;

import com.carrotsearch.randomizedtesting.annotations.Repeat;
import com.google.common.base.Predicate;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
Expand Down Expand Up @@ -404,14 +405,15 @@ public void onFailure(String source, Throwable t) {
}

@Test
@Repeat(iterations = 1000)
public void testPendingUpdateTask() throws Exception {
Settings zenSettings = settingsBuilder()
.put("discovery.type", "zen").build();
String node_0 = internalCluster().startNode(zenSettings);
internalCluster().startNodeClient(zenSettings);

Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
String node_0 = internalCluster().startNode(settings);
internalCluster().startNodeClient(settings);

ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0);
final ClusterService clusterService = internalCluster().getInstance(ClusterService.class, node_0);
final CountDownLatch block1 = new CountDownLatch(1);
final CountDownLatch invoked1 = new CountDownLatch(1);
clusterService.submitStateUpdateTask("1", new ClusterStateUpdateTask() {
Expand Down Expand Up @@ -453,33 +455,39 @@ public void clusterStateProcessed(String source, ClusterState oldState, ClusterS
});
}

// there might be other tasks in this node, make sure to only take the ones we add into account in this test

// The tasks can be re-ordered, so we need to check out-of-order
Set<String> controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
List<PendingClusterTask> pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(10));
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(10));
assertThat(pendingClusterTasks.get(0).getSource().string(), equalTo("1"));
assertThat(pendingClusterTasks.get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : pendingClusterTasks) {
assertTrue(controlSources.remove(task.getSource().string()));
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());

controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10"));
PendingClusterTasksResponse response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(10));
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(10));
assertThat(response.pendingTasks().get(0).getSource().string(), equalTo("1"));
assertThat(response.pendingTasks().get(0).isExecuting(), equalTo(true));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.getSource().string()));
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());
block1.countDown();
invoked2.await();

pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks, empty());
response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks(), empty());
// whenever we test for no tasks, we need to awaitBusy since this is a live node
assertTrue(awaitBusy(new Predicate<Object>() {
@Override
public boolean apply(Object input) {
return clusterService.pendingTasks().isEmpty();
}
}));
waitNoPendingTasksOnAll();

final CountDownLatch block2 = new CountDownLatch(1);
final CountDownLatch invoked3 = new CountDownLatch(1);
Expand Down Expand Up @@ -519,19 +527,20 @@ public void onFailure(String source, Throwable t) {
Thread.sleep(100);

pendingClusterTasks = clusterService.pendingTasks();
assertThat(pendingClusterTasks.size(), equalTo(5));
assertThat(pendingClusterTasks.size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : pendingClusterTasks) {
assertTrue(controlSources.remove(task.getSource().string()));
controlSources.remove(task.getSource().string());
}
assertTrue(controlSources.isEmpty());

response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().execute().actionGet();
assertThat(response.pendingTasks().size(), equalTo(5));
response = internalCluster().clientNodeClient().admin().cluster().preparePendingClusterTasks().get();
assertThat(response.pendingTasks().size(), greaterThanOrEqualTo(5));
controlSources = new HashSet<>(Arrays.asList("1", "2", "3", "4", "5"));
for (PendingClusterTask task : response) {
assertTrue(controlSources.remove(task.getSource().string()));
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
if (controlSources.remove(task.getSource().string())) {
assertThat(task.getTimeInQueueInMillis(), greaterThan(0l));
}
}
assertTrue(controlSources.isEmpty());
block2.countDown();
Expand Down Expand Up @@ -613,7 +622,7 @@ public boolean apply(Object obj) {
* Note, this test can only work as long as we have a single thread executor executing the state update tasks!
*/
@Test
public void testPriorizedTasks() throws Exception {
public void testPrioritizedTasks() throws Exception {
Settings settings = settingsBuilder()
.put("discovery.type", "local")
.build();
Expand Down

0 comments on commit 6e99448

Please sign in to comment.