Skip to content

Commit

Permalink
Merge pull request apache#1476 from pjain1/improved_tests
Browse files Browse the repository at this point in the history
all the review comments have been taken care of, so merging.
  • Loading branch information
himanshug committed Jul 9, 2015
2 parents b3d360f + 58cc395 commit e9b627b
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -81,13 +81,16 @@ public class DruidCoordinatorTest extends CuratorTestBase
private ConcurrentMap<String, LoadQueuePeon> loadManagementPeons;
private LoadQueuePeon loadQueuePeon;
private MetadataRuleManager metadataRuleManager;
private SegmentReplicantLookup segmentReplicantLookup;
private CountDownLatch leaderAnnouncerLatch;
private CountDownLatch leaderUnannouncerLatch;
private PathChildrenCache pathChildrenCache;
private DruidCoordinatorConfig druidCoordinatorConfig;
private ObjectMapper objectMapper;
private JacksonConfigManager configManager;
private DruidNode druidNode;
private static final String LOADPATH = "/druid/loadqueue/localhost:1234";
private static final long COORDINATOR_START_DELAY = 1;
private static final long COORDINATOR_PERIOD = 100;

@Before
public void setUp() throws Exception
Expand All @@ -108,9 +111,8 @@ public void setUp() throws Exception
setupServerAndCurator();
curator.start();
curator.create().creatingParentsIfNeeded().forPath(LOADPATH);
segmentReplicantLookup = null;
objectMapper = new DefaultObjectMapper();
druidCoordinatorConfig = new TestDruidCoordinatorConfig(new Duration(1), new Duration(6000), null, null, null, false, false);
druidCoordinatorConfig = new TestDruidCoordinatorConfig(new Duration(COORDINATOR_START_DELAY), new Duration(COORDINATOR_PERIOD), null, null, null, false, false);
pathChildrenCache = new PathChildrenCache(curator, LOADPATH, true, true, Execs.singleThreaded("coordinator_test_path_children_cache-%d"));
loadQueuePeon = new LoadQueuePeon(
curator,
Expand All @@ -120,6 +122,7 @@ public void setUp() throws Exception
Execs.singleThreaded("coordinator_test_load_queue_peon-%d"),
druidCoordinatorConfig
);
druidNode = new DruidNode("hey", "what", 1234);
loadManagementPeons = new MapMaker().makeMap();
scheduledExecutorFactory = new ScheduledExecutorFactory()
{
Expand All @@ -130,6 +133,7 @@ public ScheduledExecutorService create(int corePoolSize, final String nameFormat
}
};
leaderAnnouncerLatch = new CountDownLatch(1);
leaderUnannouncerLatch = new CountDownLatch(1);
coordinator = new DruidCoordinator(
druidCoordinatorConfig,
new ZkPathsConfig()
Expand Down Expand Up @@ -157,8 +161,14 @@ public void announce(DruidNode node)
// count down when this coordinator becomes the leader
leaderAnnouncerLatch.countDown();
}

@Override
public void unannounce(DruidNode node)
{
leaderUnannouncerLatch.countDown();
}
},
new DruidNode("hey", "what", 1234),
druidNode,
loadManagementPeons
);
}
Expand Down Expand Up @@ -235,19 +245,21 @@ public String getInventoryPath()
EasyMock.verify(metadataRuleManager);
}

@Test
@Test(timeout = 1500L)
public void testCoordinatorRun() throws Exception{
String dataSource = "dataSource1";
String tier= "hot";
Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2));
EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString()))
.andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce();
metadataRuleManager.stop();
EasyMock.expectLastCall().once();
EasyMock.replay(metadataRuleManager);

DruidDataSource[] druidDataSources = {
new DruidDataSource(dataSource, new HashMap())
};
DataSegment dataSegment = new DataSegment(dataSource, new Interval("2010-01-01/P1D"), "v1", null, null, null, null, 0x9, 0);
final DataSegment dataSegment = new DataSegment(dataSource, new Interval("2010-01-01/P1D"), "v1", null, null, null, null, 0x9, 0);
druidDataSources[0].addSegment("0", dataSegment);

EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes();
Expand All @@ -259,17 +271,7 @@ public void testCoordinatorRun() throws Exception{
EasyMock.expect(immutableDruidDataSource.getSegments())
.andReturn(ImmutableSet.of(dataSegment)).atLeastOnce();
EasyMock.replay(immutableDruidDataSource);
EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn(
new ImmutableDruidServer(
new DruidServerMetadata("server1", "localhost", 5L, "historical", tier, 0),
1L,
ImmutableMap.<String, ImmutableDruidDataSource>of(),
ImmutableMap.<String, DataSegment>of()
)
).atLeastOnce();
EasyMock.expect(druidServer.isAssignable()).andReturn(true).anyTimes();
EasyMock.expect(druidServer.getName()).andReturn("server1").anyTimes();
EasyMock.replay(druidServer);
druidServer = new DruidServer("server1", "localhost", 5L, "historical", tier, 0);
loadManagementPeons.put("server1", loadQueuePeon);

EasyMock.expect(serverInventoryView.getInventory()).andReturn(
Expand All @@ -282,18 +284,21 @@ public void testCoordinatorRun() throws Exception{
throw ex;
}
EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes();
serverInventoryView.stop();
EasyMock.expectLastCall().once();
EasyMock.replay(serverInventoryView);

coordinator.start();
// Wait for this coordinator to become leader
try {
Assert.assertTrue(leaderAnnouncerLatch.await(4, TimeUnit.SECONDS));
leaderAnnouncerLatch.await();
}
catch (InterruptedException ex) {
throw ex;
}
// This coordinator should be leader by now
Assert.assertTrue(coordinator.isLeader());
Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader());

final CountDownLatch assignSegmentLatch = new CountDownLatch(1);
pathChildrenCache.getListenable().addListener(
Expand All @@ -306,6 +311,8 @@ public void childEvent(
{
if(pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){
//Coordinator should try to assign segment to druidServer historical
//Simulate historical loading segment
druidServer.addDataSegment(dataSegment.getIdentifier(), dataSegment);
assignSegmentLatch.countDown();
}
}
Expand All @@ -317,20 +324,33 @@ public void childEvent(
catch (Exception ex) {
throw ex;
}
Assert.assertTrue(assignSegmentLatch.await(4, TimeUnit.SECONDS));

Assert.assertTrue(assignSegmentLatch.await(2, TimeUnit.SECONDS));
Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus());
curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier()));
// Wait for coordinator thread to run so that replication status is updated
while (coordinator.getSegmentAvailability().snapshot().get(dataSource) != 0) {
Thread.sleep(50);
}
Map segmentAvailability = coordinator.getSegmentAvailability().snapshot();
Assert.assertEquals(1, segmentAvailability.size());
Assert.assertEquals(0l, segmentAvailability.get(dataSource));

Map<String, CountingMap<String>> replicationStatus = coordinator.getReplicationStatus();
Assert.assertNotNull(replicationStatus);
Assert.assertEquals(1, replicationStatus.entrySet().size());

CountingMap<String> dataSourceMap = replicationStatus.get(tier);
Assert.assertNotNull(dataSourceMap);
Assert.assertEquals(1, dataSourceMap.size());
Assert.assertNotNull(dataSourceMap.get(dataSource));
// ServerInventoryView and historical DruidServer are mocked
// so the historical will never announce the segment and thus server inventory view will not be updated
// The load rules asks for 2 replicas, therefore 2 replicas will still be pending
Assert.assertEquals(2l, dataSourceMap.get(dataSource).get());
EasyMock.verify(druidServer);
// Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event
// The load rules asks for 2 replicas, therefore 1 replica should still be pending
Assert.assertEquals(1l, dataSourceMap.get(dataSource).get());
coordinator.stop();
leaderUnannouncerLatch.await();
Assert.assertFalse(coordinator.isLeader());
Assert.assertNull(coordinator.getCurrentLeader());
EasyMock.verify(serverInventoryView);
EasyMock.verify(metadataRuleManager);
}
Expand Down
Loading

0 comments on commit e9b627b

Please sign in to comment.