Skip to content

Commit

Permalink
Add a ServerType for peons (apache#4295)
Browse files Browse the repository at this point in the history
* Add a ServerType for peons

* Add toString() method, toString() test, unsupported type check

* Use ServerType enum in DruidServer and DruidServerMetadata
  • Loading branch information
jon-wei authored and pjain1 committed May 22, 2017
1 parent 8ec3a29 commit e043bf8
Show file tree
Hide file tree
Showing 35 changed files with 217 additions and 118 deletions.
10 changes: 8 additions & 2 deletions server/src/main/java/io/druid/client/DruidServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class DruidServer implements Comparable
public DruidServer(
DruidNode node,
DruidServerConfig config,
String type
ServerType type
)
{
this(
Expand All @@ -76,7 +76,7 @@ public DruidServer(
@JsonProperty("name") String name,
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("type") ServerType type,
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
)
Expand All @@ -87,6 +87,7 @@ public DruidServer(
this.segments = new ConcurrentHashMap<String, DataSegment>();
}

@JsonProperty
public String getName()
{
return metadata.getName();
Expand All @@ -97,6 +98,7 @@ public DruidServerMetadata getMetadata()
return metadata;
}

@JsonProperty
public String getHost()
{
return metadata.getHost();
Expand All @@ -107,16 +109,19 @@ public long getCurrSize()
return currSize;
}

@JsonProperty
public long getMaxSize()
{
return metadata.getMaxSize();
}

@JsonProperty
public ServerType getType()
{
return metadata.getType();
}

@JsonProperty
public String getTier()
{
return metadata.getTier();
Expand All @@ -127,6 +132,7 @@ public boolean segmentReplicatable()
return metadata.segmentReplicatable();
}

@JsonProperty
public int getPriority()
{
return metadata.getPriority();
Expand Down
8 changes: 5 additions & 3 deletions server/src/main/java/io/druid/guice/NodeTypeConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@

package io.druid.guice;

import io.druid.server.coordination.ServerType;

/**
*/
public class NodeTypeConfig
{
private final String nodeType;
private final ServerType nodeType;

public NodeTypeConfig(
String nodeType
ServerType nodeType
)
{
this.nodeType = nodeType;
}

public String getNodeType()
public ServerType getNodeType()
{
return nodeType;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public DruidServerMetadata(
@JsonProperty("name") String name,
@JsonProperty("host") String host,
@JsonProperty("maxSize") long maxSize,
@JsonProperty("type") String type,
@JsonProperty("type") ServerType type,
@JsonProperty("tier") String tier,
@JsonProperty("priority") int priority
)
Expand All @@ -47,7 +47,7 @@ public DruidServerMetadata(
this.host = host;
this.maxSize = maxSize;
this.tier = tier;
this.type = ServerType.fromString(type);
this.type = type;
this.priority = priority;
}

Expand Down
41 changes: 36 additions & 5 deletions server/src/main/java/io/druid/server/coordination/ServerType.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,33 @@

package io.druid.server.coordination;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

/**
* This enum represents types of druid services that hold segments.
* <p>
* These types are externally visible (e.g., from the output of /druid/coordinator/v1/servers).
* <p>
* For backwards compatibility, when presenting these types externally, the toString() representation
* of the enum should be used.
* <p>
* The toString() method converts the enum name() to lowercase and replaces underscores with hyphens,
* which is the format expected for the server type string prior to the patch that introduced ServerType:
* https://github.com/druid-io/druid/pull/4148
*/
public enum ServerType
{
HISTORICAL,
BRIDGE,
INDEXER_EXECUTOR {
@Override
public boolean isSegmentReplicationTarget()
{
return false;
}
},

REALTIME {
@Override
public boolean isSegmentReplicationTarget()
Expand All @@ -33,12 +56,12 @@ public boolean isSegmentReplicationTarget()

/**
* Indicates this type of node is able to be a target of segment replication.
*
* @return true if it is available for replication
*
* @see io.druid.server.coordinator.rules.LoadRule
*/
boolean isSegmentReplicationTarget()
public boolean isSegmentReplicationTarget()
{
return true;
}
Expand All @@ -48,13 +71,21 @@ boolean isSegmentReplicationTarget()
*
* @return true if it is available for broadcast.
*/
boolean isSegmentBroadcastTarget()
public boolean isSegmentBroadcastTarget()
{
return true;
}

static ServerType fromString(String type)
@JsonCreator
public static ServerType fromString(String type)
{
return ServerType.valueOf(type.toUpperCase().replace("-", "_"));
}

@Override
@JsonValue
public String toString()
{
return ServerType.valueOf(type.toUpperCase());
return name().toLowerCase().replace("_", "-");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ public void add(ServerHolder serverHolder)
case BRIDGE:
addHistorical(serverHolder);
break;
case INDEXER_EXECUTOR:
throw new IAE("unsupported server type[%s]", serverHolder.getServer().getType());
default:
throw new IAE("unknown server type[%s]", serverHolder.getServer().getType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ private static Map<String, Object> makeSimpleServer(DruidServer input)
return new ImmutableMap.Builder<String, Object>()
.put("host", input.getHost())
.put("tier", input.getTier())
.put("type", input.getType())
.put("type", input.getType().toString())
.put("priority", input.getPriority())
.put("currSize", input.getCurrSize())
.put("maxSize", input.getMaxSize())
Expand All @@ -63,7 +63,7 @@ private static Map<String, Object> makeFullServer(DruidServer input)
return new ImmutableMap.Builder<String, Object>()
.put("host", input.getHost())
.put("maxSize", input.getMaxSize())
.put("type", input.getType())
.put("type", input.getType().toString())
.put("tier", input.getTier())
.put("priority", input.getPriority())
.put("segments", input.getSegments())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.druid.query.QueryWatcher;
import io.druid.query.TableDataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -98,7 +99,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
"localhost:1234",
"localhost:1234",
10000000L,
"historical",
ServerType.HISTORICAL,
"default_tier",
0
);
Expand Down Expand Up @@ -164,7 +165,7 @@ public DruidServer apply(String input)
input,
input,
10000000L,
"historical",
ServerType.HISTORICAL,
"default_tier",
0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import io.druid.query.QueryRunner;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.CountAggregatorFactory;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -158,7 +159,7 @@ public Comparator<Integer> getComparator() {
@Override
public QueryableDruidServer pick(TreeMap<Integer, Set<QueryableDruidServer>> prioritizedServers, DataSegment segment) {
return new QueryableDruidServer(
new DruidServer("localhost", "localhost", 100, "historical", "a", 10),
new DruidServer("localhost", "localhost", 100, ServerType.HISTORICAL, "a", 10),
EasyMock.createNiceMock(DirectDruidClient.class)
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@
import io.druid.query.topn.TopNQueryQueryToolChest;
import io.druid.query.topn.TopNResultValue;
import io.druid.segment.TestHelper;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -328,11 +329,11 @@ public void setUp() throws Exception
client = makeClient(MoreExecutors.sameThreadExecutor());

servers = new DruidServer[]{
new DruidServer("test1", "test1", 10, "historical", "bye", 0),
new DruidServer("test2", "test2", 10, "historical", "bye", 0),
new DruidServer("test3", "test3", 10, "historical", "bye", 0),
new DruidServer("test4", "test4", 10, "historical", "bye", 0),
new DruidServer("test5", "test5", 10, "historical", "bye", 0)
new DruidServer("test1", "test1", 10, ServerType.HISTORICAL, "bye", 0),
new DruidServer("test2", "test2", 10, ServerType.HISTORICAL, "bye", 0),
new DruidServer("test3", "test3", 10, ServerType.HISTORICAL, "bye", 0),
new DruidServer("test4", "test4", 10, ServerType.HISTORICAL, "bye", 0),
new DruidServer("test5", "test5", 10, ServerType.HISTORICAL, "bye", 0)
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.druid.java.util.common.Pair;
import io.druid.query.TableDataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
Expand Down Expand Up @@ -91,7 +92,7 @@ public void testSingleServerAddedRemovedSegment() throws Exception
"localhost:1234",
"localhost:1234",
10000000L,
"historical",
ServerType.HISTORICAL,
"default_tier",
0
);
Expand Down Expand Up @@ -158,7 +159,7 @@ public DruidServer apply(String input)
input,
input,
10000000L,
"historical",
ServerType.HISTORICAL,
"default_tier",
0
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.coordination.ServerType;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
Expand Down Expand Up @@ -149,12 +150,12 @@ public void testRun() throws Exception
);

QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
new DruidServer("test1", "localhost", 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
QueryableDruidServer queryableDruidServer2 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
new DruidServer("test1", "localhost", 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
client2
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer2, serverSelector.getSegment());
Expand Down Expand Up @@ -253,7 +254,7 @@ public void testCancel() throws Exception
);

QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
new DruidServer("test1", "localhost", 0, "historical", DruidServer.DEFAULT_TIER, 0),
new DruidServer("test1", "localhost", 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
client1
);
serverSelector.addServerAndUpdateSegment(queryableDruidServer1, serverSelector.getSegment());
Expand Down Expand Up @@ -322,7 +323,7 @@ public void testQueryInterruptionExceptionLogMessage() throws JsonProcessingExce
);

QueryableDruidServer queryableDruidServer = new QueryableDruidServer(
new DruidServer("test1", hostName, 0, "historical", DruidServer.DEFAULT_TIER, 0),
new DruidServer("test1", hostName, 0, ServerType.HISTORICAL, DruidServer.DEFAULT_TIER, 0),
client1
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import io.druid.server.coordination.CuratorDataSegmentServerAnnouncer;
import io.druid.server.coordination.DataSegmentServerAnnouncer;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.server.initialization.BatchDataSegmentAnnouncerConfig;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.timeline.DataSegment;
Expand Down Expand Up @@ -124,7 +125,7 @@ public void setUp() throws Exception
"id",
"host",
Long.MAX_VALUE,
"historical",
ServerType.HISTORICAL,
"tier",
0
);
Expand Down Expand Up @@ -443,7 +444,7 @@ public BatchDataSegmentAnnouncer call()
"id",
"host",
Long.MAX_VALUE,
"historical",
ServerType.HISTORICAL,
"tier",
0
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.druid.client.ImmutableSegmentLoadInfo;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.coordination.ServerType;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import junit.framework.Assert;
Expand Down Expand Up @@ -51,7 +52,7 @@ public void testSerde() throws IOException
null,
NoneShardSpec.instance(),
0, 0
), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, "historical", "tier", 1))
), Sets.newHashSet(new DruidServerMetadata("a", "host", 10, ServerType.HISTORICAL, "tier", 1))
);

ImmutableSegmentLoadInfo serde = mapper.readValue(
Expand Down
Loading

0 comments on commit e043bf8

Please sign in to comment.