diff --git a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java index bcd7f3a07db9..b15300a0d69d 100644 --- a/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java +++ b/server/src/test/java/io/druid/server/coordinator/DruidCoordinatorTest.java @@ -81,13 +81,16 @@ public class DruidCoordinatorTest extends CuratorTestBase private ConcurrentMap loadManagementPeons; private LoadQueuePeon loadQueuePeon; private MetadataRuleManager metadataRuleManager; - private SegmentReplicantLookup segmentReplicantLookup; private CountDownLatch leaderAnnouncerLatch; + private CountDownLatch leaderUnannouncerLatch; private PathChildrenCache pathChildrenCache; private DruidCoordinatorConfig druidCoordinatorConfig; private ObjectMapper objectMapper; private JacksonConfigManager configManager; + private DruidNode druidNode; private static final String LOADPATH = "/druid/loadqueue/localhost:1234"; + private static final long COORDINATOR_START_DELAY = 1; + private static final long COORDINATOR_PERIOD = 100; @Before public void setUp() throws Exception @@ -108,9 +111,8 @@ public void setUp() throws Exception setupServerAndCurator(); curator.start(); curator.create().creatingParentsIfNeeded().forPath(LOADPATH); - segmentReplicantLookup = null; objectMapper = new DefaultObjectMapper(); - druidCoordinatorConfig = new TestDruidCoordinatorConfig(new Duration(1), new Duration(6000), null, null, null, false, false); + druidCoordinatorConfig = new TestDruidCoordinatorConfig(new Duration(COORDINATOR_START_DELAY), new Duration(COORDINATOR_PERIOD), null, null, null, false, false); pathChildrenCache = new PathChildrenCache(curator, LOADPATH, true, true, Execs.singleThreaded("coordinator_test_path_children_cache-%d")); loadQueuePeon = new LoadQueuePeon( curator, @@ -120,6 +122,7 @@ public void setUp() throws Exception Execs.singleThreaded("coordinator_test_load_queue_peon-%d"), druidCoordinatorConfig ); + druidNode = new DruidNode("hey", "what", 1234); loadManagementPeons = new MapMaker().makeMap(); scheduledExecutorFactory = new ScheduledExecutorFactory() { @@ -130,6 +133,7 @@ public ScheduledExecutorService create(int corePoolSize, final String nameFormat } }; leaderAnnouncerLatch = new CountDownLatch(1); + leaderUnannouncerLatch = new CountDownLatch(1); coordinator = new DruidCoordinator( druidCoordinatorConfig, new ZkPathsConfig() @@ -157,8 +161,14 @@ public void announce(DruidNode node) // count down when this coordinator becomes the leader leaderAnnouncerLatch.countDown(); } + + @Override + public void unannounce(DruidNode node) + { + leaderUnannouncerLatch.countDown(); + } }, - new DruidNode("hey", "what", 1234), + druidNode, loadManagementPeons ); } @@ -235,19 +245,21 @@ public String getInventoryPath() EasyMock.verify(metadataRuleManager); } - @Test + @Test(timeout = 1500L) public void testCoordinatorRun() throws Exception{ String dataSource = "dataSource1"; String tier= "hot"; Rule foreverLoadRule = new ForeverLoadRule(ImmutableMap.of(tier, 2)); EasyMock.expect(metadataRuleManager.getRulesWithDefault(EasyMock.anyString())) .andReturn(ImmutableList.of(foreverLoadRule)).atLeastOnce(); + metadataRuleManager.stop(); + EasyMock.expectLastCall().once(); EasyMock.replay(metadataRuleManager); DruidDataSource[] druidDataSources = { new DruidDataSource(dataSource, new HashMap()) }; - DataSegment dataSegment = new DataSegment(dataSource, new Interval("2010-01-01/P1D"), "v1", null, null, null, null, 0x9, 0); + final DataSegment dataSegment = new DataSegment(dataSource, new Interval("2010-01-01/P1D"), "v1", null, null, null, null, 0x9, 0); druidDataSources[0].addSegment("0", dataSegment); EasyMock.expect(databaseSegmentManager.isStarted()).andReturn(true).anyTimes(); @@ -259,17 +271,7 @@ public void testCoordinatorRun() throws Exception{ EasyMock.expect(immutableDruidDataSource.getSegments()) .andReturn(ImmutableSet.of(dataSegment)).atLeastOnce(); EasyMock.replay(immutableDruidDataSource); - EasyMock.expect(druidServer.toImmutableDruidServer()).andReturn( - new ImmutableDruidServer( - new DruidServerMetadata("server1", "localhost", 5L, "historical", tier, 0), - 1L, - ImmutableMap.of(), - ImmutableMap.of() - ) - ).atLeastOnce(); - EasyMock.expect(druidServer.isAssignable()).andReturn(true).anyTimes(); - EasyMock.expect(druidServer.getName()).andReturn("server1").anyTimes(); - EasyMock.replay(druidServer); + druidServer = new DruidServer("server1", "localhost", 5L, "historical", tier, 0); loadManagementPeons.put("server1", loadQueuePeon); EasyMock.expect(serverInventoryView.getInventory()).andReturn( @@ -282,18 +284,21 @@ public void testCoordinatorRun() throws Exception{ throw ex; } EasyMock.expect(serverInventoryView.isStarted()).andReturn(true).anyTimes(); + serverInventoryView.stop(); + EasyMock.expectLastCall().once(); EasyMock.replay(serverInventoryView); coordinator.start(); // Wait for this coordinator to become leader try { - Assert.assertTrue(leaderAnnouncerLatch.await(4, TimeUnit.SECONDS)); + leaderAnnouncerLatch.await(); } catch (InterruptedException ex) { throw ex; } // This coordinator should be leader by now Assert.assertTrue(coordinator.isLeader()); + Assert.assertEquals(druidNode.getHostAndPort(), coordinator.getCurrentLeader()); final CountDownLatch assignSegmentLatch = new CountDownLatch(1); pathChildrenCache.getListenable().addListener( @@ -306,6 +311,8 @@ public void childEvent( { if(pathChildrenCacheEvent.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)){ //Coordinator should try to assign segment to druidServer historical + //Simulate historical loading segment + druidServer.addDataSegment(dataSegment.getIdentifier(), dataSegment); assignSegmentLatch.countDown(); } } @@ -317,20 +324,33 @@ public void childEvent( catch (Exception ex) { throw ex; } - Assert.assertTrue(assignSegmentLatch.await(4, TimeUnit.SECONDS)); + + Assert.assertTrue(assignSegmentLatch.await(2, TimeUnit.SECONDS)); + Assert.assertEquals(ImmutableMap.of(dataSource, 100.0), coordinator.getLoadStatus()); curator.delete().guaranteed().forPath(ZKPaths.makePath(LOADPATH, dataSegment.getIdentifier())); + // Wait for coordinator thread to run so that replication status is updated + while (coordinator.getSegmentAvailability().snapshot().get(dataSource) != 0) { + Thread.sleep(50); + } + Map segmentAvailability = coordinator.getSegmentAvailability().snapshot(); + Assert.assertEquals(1, segmentAvailability.size()); + Assert.assertEquals(0l, segmentAvailability.get(dataSource)); + Map> replicationStatus = coordinator.getReplicationStatus(); Assert.assertNotNull(replicationStatus); Assert.assertEquals(1, replicationStatus.entrySet().size()); + CountingMap dataSourceMap = replicationStatus.get(tier); Assert.assertNotNull(dataSourceMap); Assert.assertEquals(1, dataSourceMap.size()); Assert.assertNotNull(dataSourceMap.get(dataSource)); - // ServerInventoryView and historical DruidServer are mocked - // so the historical will never announce the segment and thus server inventory view will not be updated - // The load rules asks for 2 replicas, therefore 2 replicas will still be pending - Assert.assertEquals(2l, dataSourceMap.get(dataSource).get()); - EasyMock.verify(druidServer); + // Simulated the adding of segment to druidServer during SegmentChangeRequestLoad event + // The load rules asks for 2 replicas, therefore 1 replica should still be pending + Assert.assertEquals(1l, dataSourceMap.get(dataSource).get()); + coordinator.stop(); + leaderUnannouncerLatch.await(); + Assert.assertFalse(coordinator.isLeader()); + Assert.assertNull(coordinator.getCurrentLeader()); EasyMock.verify(serverInventoryView); EasyMock.verify(metadataRuleManager); } diff --git a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java index a17e5f2f35c4..f24ed60f5fd9 100644 --- a/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java +++ b/server/src/test/java/io/druid/server/http/DatasourcesResourceTest.java @@ -20,6 +20,7 @@ package io.druid.server.http; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; import io.druid.client.DruidDataSource; import io.druid.client.DruidServer; import io.druid.client.InventoryView; @@ -27,6 +28,7 @@ import org.easymock.EasyMock; import org.joda.time.Interval; import org.junit.Assert; +import org.junit.Before; import org.junit.Test; import javax.ws.rs.core.Response; @@ -35,21 +37,71 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; public class DatasourcesResourceTest { + private InventoryView inventoryView; + private DruidServer server; + private List listDataSources; + private List dataSegmentList; + + @Before + public void setUp() + { + inventoryView = EasyMock.createStrictMock(InventoryView.class); + server = EasyMock.createStrictMock(DruidServer.class); + dataSegmentList = new ArrayList<>(); + dataSegmentList.add( + new DataSegment( + "datasource1", + new Interval("2010-01-01/P1D"), + null, + null, + null, + null, + null, + 0x9, + 0 + ) + ); + dataSegmentList.add( + new DataSegment( + "datasource1", + new Interval("2010-01-22/P1D"), + null, + null, + null, + null, + null, + 0x9, + 0 + ) + ); + dataSegmentList.add( + new DataSegment( + "datasource2", + new Interval("2010-01-01/P1D"), + null, + null, + null, + null, + null, + 0x9, + 0 + ) + ); + listDataSources = new ArrayList<>(); + listDataSources.add(new DruidDataSource("datasource1", new HashMap()).addSegment("part1", dataSegmentList.get(0))); + listDataSources.add(new DruidDataSource("datasource2", new HashMap()).addSegment("part1", dataSegmentList.get(1))); + } @Test public void testGetFullQueryableDataSources() throws Exception { - InventoryView inventoryView = EasyMock.createStrictMock(InventoryView.class); - DruidServer server = EasyMock.createStrictMock(DruidServer.class); - DruidDataSource[] druidDataSources = { - new DruidDataSource("datasource1", new HashMap()), - new DruidDataSource("datasource2", new HashMap()) - }; EasyMock.expect(server.getDataSources()).andReturn( - ImmutableList.of(druidDataSources[0], druidDataSources[1]) + ImmutableList.of(listDataSources.get(0), listDataSources.get(1)) ).atLeastOnce(); EasyMock.expect(inventoryView.getInventory()).andReturn( ImmutableList.of(server) @@ -57,15 +109,15 @@ public void testGetFullQueryableDataSources() throws Exception EasyMock.replay(inventoryView, server); DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null); Response response = datasourcesResource.getQueryableDataSources("full", null); - Set result = (Set)response.getEntity(); + Set result = (Set) response.getEntity(); DruidDataSource[] resultantDruidDataSources = new DruidDataSource[result.size()]; result.toArray(resultantDruidDataSources); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(2, resultantDruidDataSources.length); - Assert.assertArrayEquals(druidDataSources, resultantDruidDataSources); + Assert.assertArrayEquals(listDataSources.toArray(), resultantDruidDataSources); response = datasourcesResource.getQueryableDataSources(null, null); - List result1 = (List)response.getEntity(); + List result1 = (List) response.getEntity(); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(2, result1.size()); Assert.assertTrue(result1.contains("datasource1")); @@ -76,29 +128,8 @@ public void testGetFullQueryableDataSources() throws Exception @Test public void testGetSimpleQueryableDataSources() throws Exception { - InventoryView inventoryView = EasyMock.createStrictMock(InventoryView.class); - DruidServer server = EasyMock.createStrictMock(DruidServer.class); - List> input = new ArrayList(2); - HashMap dataSourceProp1 = new HashMap<>(); - dataSourceProp1.put("name", "datasource1"); - dataSourceProp1.put("partitionName", "partition"); - dataSourceProp1.put("datasegment", - new DataSegment("datasource1", new Interval("2010-01-01/P1D"), null, null, null, null, null, 0x9, 0)); - - HashMap dataSourceProp2 = new HashMap<>(); - dataSourceProp2.put("name", "datasource2"); - dataSourceProp2.put("partitionName", "partition"); - dataSourceProp2.put("datasegment", - new DataSegment("datasource2", new Interval("2010-01-01/P1D"), null, null, null, null, null, 0x9, 0)); - input.add(dataSourceProp1); - input.add(dataSourceProp2); - List listDataSources = new ArrayList<>(); - for(Map entry : input){ - listDataSources.add(new DruidDataSource(entry.get("name").toString(), new HashMap()) - .addSegment(entry.get("partitionName").toString(), (DataSegment)entry.get("datasegment"))); - } EasyMock.expect(server.getDataSources()).andReturn( - listDataSources + listDataSources ).atLeastOnce(); EasyMock.expect(server.getDataSource("datasource1")).andReturn( listDataSources.get(0) @@ -116,10 +147,10 @@ public void testGetSimpleQueryableDataSources() throws Exception DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null); Response response = datasourcesResource.getQueryableDataSources(null, "simple"); Assert.assertEquals(200, response.getStatus()); - List> results = (List>)response.getEntity(); + List> results = (List>) response.getEntity(); int index = 0; - for(Map entry : results){ - Assert.assertEquals(input.get(index).get("name"), entry.get("name").toString()); + for (Map entry : results) { + Assert.assertEquals(listDataSources.get(index).getName(), entry.get("name").toString()); Assert.assertTrue(((Map) ((Map) entry.get("properties")).get("tiers")).containsKey(null)); Assert.assertNotNull((((Map) entry.get("properties")).get("segments"))); Assert.assertEquals(1, ((Map) ((Map) entry.get("properties")).get("segments")).get("count")); @@ -131,8 +162,6 @@ public void testGetSimpleQueryableDataSources() throws Exception @Test public void testFullGetTheDataSource() throws Exception { - InventoryView inventoryView = EasyMock.createStrictMock(InventoryView.class); - DruidServer server = EasyMock.createStrictMock(DruidServer.class); DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap()); EasyMock.expect(server.getDataSource("datasource1")).andReturn( dataSource1 @@ -144,7 +173,7 @@ public void testFullGetTheDataSource() throws Exception EasyMock.replay(inventoryView, server); DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null); Response response = datasourcesResource.getTheDataSource("datasource1", "full"); - DruidDataSource result = (DruidDataSource)response.getEntity(); + DruidDataSource result = (DruidDataSource) response.getEntity(); Assert.assertEquals(200, response.getStatus()); Assert.assertEquals(dataSource1, result); EasyMock.verify(inventoryView, server); @@ -153,8 +182,6 @@ public void testFullGetTheDataSource() throws Exception @Test public void testNullGetTheDataSource() throws Exception { - InventoryView inventoryView = EasyMock.createStrictMock(InventoryView.class); - DruidServer server = EasyMock.createStrictMock(DruidServer.class); EasyMock.expect(server.getDataSource("none")).andReturn(null).atLeastOnce(); EasyMock.expect(inventoryView.getInventory()).andReturn( ImmutableList.of(server) @@ -169,11 +196,11 @@ public void testNullGetTheDataSource() throws Exception @Test public void testSimpleGetTheDataSource() throws Exception { - InventoryView inventoryView = EasyMock.createStrictMock(InventoryView.class); - DruidServer server = EasyMock.createStrictMock(DruidServer.class); DruidDataSource dataSource1 = new DruidDataSource("datasource1", new HashMap()); - dataSource1.addSegment("partition", - new DataSegment("datasegment1", new Interval("2010-01-01/P1D"), null, null, null, null, null, 0x9, 0)); + dataSource1.addSegment( + "partition", + new DataSegment("datasegment1", new Interval("2010-01-01/P1D"), null, null, null, null, null, 0x9, 0) + ); EasyMock.expect(server.getDataSource("datasource1")).andReturn( dataSource1 ).atLeastOnce(); @@ -186,11 +213,129 @@ public void testSimpleGetTheDataSource() throws Exception DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null); Response response = datasourcesResource.getTheDataSource("datasource1", null); Assert.assertEquals(200, response.getStatus()); - Map> result = (Map>)response.getEntity(); - Assert.assertEquals(1, ((Map)(result.get("tiers").get(null))).get("segmentCount")); + Map> result = (Map>) response.getEntity(); + Assert.assertEquals(1, ((Map) (result.get("tiers").get(null))).get("segmentCount")); Assert.assertNotNull(result.get("segments")); Assert.assertNotNull(result.get("segments").get("minTime").toString(), "2010-01-01T00:00:00.000Z"); Assert.assertNotNull(result.get("segments").get("maxTime").toString(), "2010-01-02T00:00:00.000Z"); EasyMock.verify(inventoryView, server); } + + @Test + public void testGetSegmentDataSourceIntervals() + { + server = new DruidServer("who", "host", 1234, "historical", "tier1", 0); + server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0)); + server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1)); + server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2)); + EasyMock.expect(inventoryView.getInventory()).andReturn( + ImmutableList.of(server) + ).atLeastOnce(); + EasyMock.replay(inventoryView); + + List expectedIntervals = new ArrayList<>(); + expectedIntervals.add(new Interval("2010-01-22T00:00:00.000Z/2010-01-23T00:00:00.000Z")); + expectedIntervals.add(new Interval("2010-01-01T00:00:00.000Z/2010-01-02T00:00:00.000Z")); + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null); + + Response response = datasourcesResource.getSegmentDataSourceIntervals("invalidDataSource", null, null); + Assert.assertEquals(response.getEntity(), null); + + response = datasourcesResource.getSegmentDataSourceIntervals("datasource1", null, null); + TreeSet actualIntervals = (TreeSet) response.getEntity(); + Assert.assertEquals(2, actualIntervals.size()); + Assert.assertEquals(expectedIntervals.get(0), actualIntervals.first()); + Assert.assertEquals(expectedIntervals.get(1), actualIntervals.last()); + + response = datasourcesResource.getSegmentDataSourceIntervals("datasource1", "simple", null); + TreeMap> results = (TreeMap) response.getEntity(); + Assert.assertEquals(2, results.size()); + Assert.assertEquals(expectedIntervals.get(0), results.firstKey()); + Assert.assertEquals(expectedIntervals.get(1), results.lastKey()); + Assert.assertEquals(1, results.firstEntry().getValue().get("count")); + Assert.assertEquals(1, results.lastEntry().getValue().get("count")); + + response = datasourcesResource.getSegmentDataSourceIntervals("datasource1", null, "full"); + results = ((TreeMap>) response.getEntity()); + int i = 1; + for (Map.Entry> entry : results.entrySet()) { + Assert.assertEquals(dataSegmentList.get(i).getInterval(), entry.getKey()); + Assert.assertEquals( + dataSegmentList.get(i), + ((Map) entry.getValue().get(dataSegmentList.get(i).getIdentifier())).get( + "metadata" + ) + ); + i--; + } + EasyMock.verify(inventoryView); + } + + @Test + public void testGetSegmentDataSourceSpecificInterval() + { + server = new DruidServer("who", "host", 1234, "historical", "tier1", 0); + server.addDataSegment(dataSegmentList.get(0).getIdentifier(), dataSegmentList.get(0)); + server.addDataSegment(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(1)); + server.addDataSegment(dataSegmentList.get(2).getIdentifier(), dataSegmentList.get(2)); + EasyMock.expect(inventoryView.getInventory()).andReturn( + ImmutableList.of(server) + ).atLeastOnce(); + EasyMock.replay(inventoryView); + + DatasourcesResource datasourcesResource = new DatasourcesResource(inventoryView, null, null); + Response response = datasourcesResource.getSegmentDataSourceSpecificInterval( + "invalidDataSource", + "2010-01-01/P1D", + null, + null + ); + Assert.assertEquals(null, response.getEntity()); + + response = datasourcesResource.getSegmentDataSourceSpecificInterval( + "datasource1", + "2010-03-01/P1D", + null, + null + ); // interval not present in the datasource + Assert.assertEquals(ImmutableSet.of(), response.getEntity()); + + response = datasourcesResource.getSegmentDataSourceSpecificInterval("datasource1", "2010-01-01/P1D", null, null); + Assert.assertEquals(ImmutableSet.of(dataSegmentList.get(0).getIdentifier()), response.getEntity()); + + response = datasourcesResource.getSegmentDataSourceSpecificInterval("datasource1", "2010-01-01/P1M", null, null); + Assert.assertEquals( + ImmutableSet.of(dataSegmentList.get(1).getIdentifier(), dataSegmentList.get(0).getIdentifier()), + response.getEntity() + ); + + response = datasourcesResource.getSegmentDataSourceSpecificInterval( + "datasource1", + "2010-01-01/P1M", + "simple", + null + ); + HashMap> results = ((HashMap>) response.getEntity()); + Assert.assertEquals(2, results.size()); + int i; + for (i = 0; i < 2; i++) { + Assert.assertTrue(results.containsKey(dataSegmentList.get(i).getInterval())); + Assert.assertEquals(1, (results.get(dataSegmentList.get(i).getInterval())).get("count")); + } + + response = datasourcesResource.getSegmentDataSourceSpecificInterval("datasource1", "2010-01-01/P1M", null, "full"); + TreeMap> results1 = ((TreeMap>) response.getEntity()); + i = 1; + for (Map.Entry> entry : results1.entrySet()) { + Assert.assertEquals(dataSegmentList.get(i).getInterval(), entry.getKey()); + Assert.assertEquals( + dataSegmentList.get(i), + ((Map) entry.getValue().get(dataSegmentList.get(i).getIdentifier())).get( + "metadata" + ) + ); + i--; + } + EasyMock.verify(inventoryView); + } }