Skip to content

Commit

Permalink
Task Retry Level Scheduler Changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sumanth43 committed Aug 9, 2022
1 parent c5ed67f commit 15dcde2
Show file tree
Hide file tree
Showing 72 changed files with 8,029 additions and 143 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -768,6 +769,12 @@ Number of files should not cross MinValue(max-splits-to-group, Total number of f
return resultConnectorSplits;
}

@Override
public Optional<List<Object>> getTableExecuteSplitsInfo()
{
return Optional.empty();
}

private boolean getSmallerSplits(List<HiveSplitWrapper> hiveSplitWrappers, Multimap<Integer, HiveSplit> bucketNumberHiveSplits,
long maxSplitBytes, int replicationFactor, ImmutableList.Builder<ConnectorSplit> connectorSplitList)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.collect.ImmutableMap;
import io.prestosql.spi.HostAddress;
import io.prestosql.spi.connector.ConnectorSplit;
import org.openjdk.jol.info.ClassLayout;

import java.util.List;
import java.util.OptionalInt;
Expand All @@ -31,6 +32,7 @@
public class HiveSplitWrapper
implements ConnectorSplit
{
private static final int INSTANCE_SIZE = ClassLayout.parseClass(HiveSplitWrapper.class).instanceSize();
private final List<HiveSplit> splits;
private final OptionalInt bucketNumber;

Expand Down Expand Up @@ -147,4 +149,11 @@ public int getSplitCount()
{
return splits.size();
}

//TODO(SURYA): need to check if any others need to be added.
@Override
public long getRetainedSizeInBytes()
{
return INSTANCE_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5568,7 +5568,8 @@ public void testRuseExchangeGroupSplitsMatchingBetweenProducerConsumer()
new DynamicFilterService(new LocalStateStoreProvider(
new SeedStoreManager(new FileSystemClientManager()))),
new QuerySnapshotManager(stageId.getQueryId(), NOOP_RECOVERY_UTILS, TEST_SESSION),
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()));
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()),
Optional.empty());

Set<Split> splits = createAndGetSplits(10);
Multimap<InternalNode, Split> producerAssignment = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values()), Optional.of(producerStage)).getAssignments();
Expand All @@ -5594,7 +5595,8 @@ public void testRuseExchangeGroupSplitsMatchingBetweenProducerConsumer()
new DynamicFilterService(new LocalStateStoreProvider(
new SeedStoreManager(new FileSystemClientManager()))),
new QuerySnapshotManager(stageId.getQueryId(), NOOP_RECOVERY_UTILS, TEST_SESSION),
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()));
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()),
Optional.empty());
Multimap<InternalNode, Split> consumerAssignment = nodeSelector.computeAssignments(splits, ImmutableList.copyOf(taskMap.values()), Optional.of(stage)).getAssignments();

assertEquals(consumerAssignment.size(), consumerAssignment.size());
Expand Down Expand Up @@ -5650,7 +5652,8 @@ public void testRuseExchangeSplitsGroupNotMatchingBetweenProducerConsumer()
new DynamicFilterService(new LocalStateStoreProvider(
new SeedStoreManager(new FileSystemClientManager()))),
new QuerySnapshotManager(stageId.getQueryId(), NOOP_RECOVERY_UTILS, TEST_SESSION),
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()));
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()),
Optional.empty());

Set<Split> producerSplits = createAndGetSplits(10);
Multimap<InternalNode, Split> producerAssignment = nodeSelector.computeAssignments(producerSplits, ImmutableList.copyOf(taskMap.values()), Optional.of(producerStage)).getAssignments();
Expand All @@ -5676,7 +5679,8 @@ public void testRuseExchangeSplitsGroupNotMatchingBetweenProducerConsumer()
new DynamicFilterService(new LocalStateStoreProvider(
new SeedStoreManager(new FileSystemClientManager()))),
new QuerySnapshotManager(stageId.getQueryId(), NOOP_RECOVERY_UTILS, TEST_SESSION),
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()));
new QueryRecoveryManager(NOOP_RECOVERY_UTILS, TEST_SESSION, stageId.getQueryId()),
Optional.empty());
Set<Split> consumerSplits = createAndGetSplits(50);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
package io.prestosql.execution;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.DataSize;
import io.airlift.units.Duration;
Expand All @@ -30,7 +30,7 @@
import io.prestosql.sql.planner.Plan;

import java.util.List;
import java.util.Set;
import java.util.Map;
import java.util.function.Consumer;

import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -109,14 +109,14 @@ class QueryOutputInfo
{
private final List<String> columnNames;
private final List<Type> columnTypes;
private final Set<TaskLocation> bufferLocations;
private final Map<TaskId, TaskLocation> bufferLocations;
private final boolean noMoreBufferLocations;

public QueryOutputInfo(List<String> columnNames, List<Type> columnTypes, Set<TaskLocation> bufferLocations, boolean noMoreBufferLocations)
public QueryOutputInfo(List<String> columnNames, List<Type> columnTypes, Map<TaskId, TaskLocation> bufferLocations, boolean noMoreBufferLocations)
{
this.columnNames = ImmutableList.copyOf(requireNonNull(columnNames, "columnNames is null"));
this.columnTypes = ImmutableList.copyOf(requireNonNull(columnTypes, "columnTypes is null"));
this.bufferLocations = ImmutableSet.copyOf(requireNonNull(bufferLocations, "bufferLocations is null"));
this.bufferLocations = ImmutableMap.copyOf(requireNonNull(bufferLocations, "bufferLocations is null"));
this.noMoreBufferLocations = noMoreBufferLocations;
}

Expand All @@ -130,7 +130,7 @@ public List<Type> getColumnTypes()
return columnTypes;
}

public Set<TaskLocation> getBufferLocations()
public Map<TaskId, TaskLocation> getBufferLocations()
{
return bufferLocations;
}
Expand Down
Loading

0 comments on commit 15dcde2

Please sign in to comment.