Skip to content

Commit

Permalink
Add NodeProvider interface and ModularHashingNodeProvider
Browse files Browse the repository at this point in the history
  • Loading branch information
rongrong authored and beinan committed Dec 23, 2021
1 parent 9b0712a commit 9e46a70
Show file tree
Hide file tree
Showing 47 changed files with 182 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.accumulo.serializers.AccumuloRowSerializer;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -168,7 +169,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -85,7 +86,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
// discard the port number
return ImmutableList.of(HostAddress.fromString(host.getHostText()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -108,7 +109,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -95,7 +96,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -79,7 +80,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -97,7 +98,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.druid.metadata.DruidSegmentInfo;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -106,7 +107,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return address.map(ImmutableList::of).orElse(ImmutableList.of());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -81,7 +82,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return address.map(host -> ImmutableList.of(HostAddress.fromString(host)))
.orElseGet(ImmutableList::of);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -86,7 +87,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -70,7 +71,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return hostAddresses;
}
Expand Down
17 changes: 3 additions & 14 deletions presto-hive/src/main/java/com/facebook/presto/hive/HiveSplit.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.facebook.presto.spi.ColumnHandle;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.SplitWeight;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -34,7 +34,6 @@
import java.util.OptionalLong;
import java.util.Set;

import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.SOFT_AFFINITY;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkArgument;
Expand Down Expand Up @@ -209,20 +208,10 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (sortedCandidates == null || sortedCandidates.isEmpty()) {
throw new PrestoException(NO_NODES_AVAILABLE, "sortedCandidates is null or empty for HiveSplit");
}

if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
// Use + 1 as secondary hash for now, would always get a different position from the first hash.
int size = sortedCandidates.size();
int mod = path.hashCode() % size;
int position = mod < 0 ? mod + size : mod;
return ImmutableList.of(
sortedCandidates.get(position),
sortedCandidates.get((position + 1) % size));
return nodeProvider.get(path, 2);
}
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand All @@ -27,7 +27,6 @@
import java.util.List;
import java.util.Map;

import static com.facebook.presto.spi.StandardErrorCode.NO_NODES_AVAILABLE;
import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.SOFT_AFFINITY;
import static com.google.common.base.MoreObjects.toStringHelper;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -106,20 +105,10 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
return nodeSelectionStrategy;
}
@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
if (sortedCandidates == null || sortedCandidates.isEmpty()) {
throw new PrestoException(NO_NODES_AVAILABLE, "sortedCandidates is null or empty for HiveSplit");
}

if (getNodeSelectionStrategy() == SOFT_AFFINITY) {
// Use + 1 as secondary hash for now, would always get a different position from the first hash.
int size = sortedCandidates.size();
int mod = path.hashCode() % size;
int position = mod < 0 ? mod + size : mod;
return ImmutableList.of(
sortedCandidates.get(position),
sortedCandidates.get((position + 1) % size));
return nodeProvider.get(path, 2);
}
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -59,7 +60,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -137,7 +138,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of(leader);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -67,7 +68,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.common.predicate.TupleDomain;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.SchemaTableName;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -71,7 +72,7 @@ public NodeSelectionStrategy getNodeSelectionStrategy()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return ImmutableList.of(address);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.metadata.QualifiedTablePrefix;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -64,7 +65,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return addresses;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.spi.ConnectorId;
import com.facebook.presto.spi.ConnectorSplit;
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.NodeProvider;
import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down Expand Up @@ -78,7 +79,7 @@ public List<HostAddress> getAddresses()
}

@Override
public List<HostAddress> getPreferredNodes(List<HostAddress> sortedCandidates)
public List<HostAddress> getPreferredNodes(NodeProvider nodeProvider)
{
return addresses;
}
Expand Down
Loading

0 comments on commit 9e46a70

Please sign in to comment.