Skip to content

Commit

Permalink
Implement BinaryFileSpiller
Browse files Browse the repository at this point in the history
BinaryFileSpiller spills data to binary files without any compression.
  • Loading branch information
pnowojski authored and cberner committed Oct 4, 2016
1 parent e953c50 commit 71bb741
Show file tree
Hide file tree
Showing 7 changed files with 331 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spi.type.TypeManager;
import com.facebook.presto.spiller.BinarySpillerFactory;
import com.facebook.presto.spiller.SpillerFactory;
import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.split.PageSinkProvider;
import com.facebook.presto.split.PageSourceManager;
Expand Down Expand Up @@ -389,6 +391,9 @@ protected void setup(Binder binder)

// Finalizer
binder.bind(FinalizerService.class).in(Scopes.SINGLETON);

// Spiller
binder.bind(SpillerFactory.class).to(BinarySpillerFactory.class).in(Scopes.SINGLETON);
}

@Provides
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.spiller;

import com.facebook.presto.block.PagesSerde;
import com.facebook.presto.spi.Page;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.ListeningExecutorService;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.InputStreamSliceInput;
import io.airlift.slice.OutputStreamSliceOutput;
import io.airlift.slice.RuntimeIOException;
import io.airlift.slice.SliceOutput;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
import static com.facebook.presto.spiller.BinarySpillerFactory.SPILL_PATH;
import static com.facebook.presto.util.ImmutableCollectors.toImmutableList;
import static java.util.Objects.requireNonNull;
import static java.util.stream.Collectors.toList;

public class BinaryFileSpiller
implements Spiller
{
private final Path targetDirectory;
private final Closer closer = Closer.create();
private final BlockEncodingSerde blockEncodingSerde;

private int spillsCount;
private final ListeningExecutorService executor;

public BinaryFileSpiller(BlockEncodingSerde blockEncodingSerde, ListeningExecutorService executor)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.executor = requireNonNull(executor, "executor is null");
try {
this.targetDirectory = Files.createTempDirectory(SPILL_PATH, "presto-spill");
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to create spill directory", e);
}
}

@Override
public CompletableFuture<?> spill(Iterator<Page> pageIterator)
{
Path spillPath = getPath(spillsCount++);

return MoreFutures.toCompletableFuture(executor.submit(
() -> writePages(pageIterator, spillPath)
));
}

private void writePages(Iterator<Page> pageIterator, Path spillPath)
{
try (SliceOutput output = new OutputStreamSliceOutput(new BufferedOutputStream(new FileOutputStream(spillPath.toFile())))) {
PagesSerde.writePages(blockEncodingSerde, output, pageIterator);
}
catch (RuntimeIOException | IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to spill pages", e);
}
}

@Override
public List<Iterator<Page>> getSpills()
{
return IntStream.range(0, spillsCount)
.mapToObj(i -> readPages(getPath(i)))
.collect(toImmutableList());
}

private Iterator<Page> readPages(Path spillPath)
{
try {
InputStream input = new BufferedInputStream(new FileInputStream(spillPath.toFile()));
closer.register(input);
return PagesSerde.readPages(blockEncodingSerde, new InputStreamSliceInput(input));
}
catch (IOException e) {
throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to read spilled pages", e);
}
}

@Override
public void close()
{
try (Stream<Path> list = Files.list(targetDirectory)) {
closer.close();
for (Path path : list.collect(toList())) {
Files.delete(path);
}
Files.delete(targetDirectory);
}
catch (IOException e) {
throw new PrestoException(
GENERIC_INTERNAL_ERROR,
String.format("Failed to delete directory [%s]", targetDirectory),
e);
}
}

private Path getPath(int spillNumber)
{
return Paths.get(targetDirectory.toAbsolutePath().toString(), String.format("%d.bin", spillNumber));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.facebook.presto.spiller;

import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.type.Type;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.inject.Inject;

import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static java.util.Objects.requireNonNull;
import static java.util.concurrent.Executors.newFixedThreadPool;

public class BinarySpillerFactory
implements SpillerFactory
{
public static final Path SPILL_PATH = Paths.get("/tmp/spills");

private final ListeningExecutorService executor;
private final BlockEncodingSerde blockEncodingSerde;

@Inject
public BinarySpillerFactory(BlockEncodingSerde blockEncodingSerde)
{
this(blockEncodingSerde, MoreExecutors.listeningDecorator(newFixedThreadPool(4, daemonThreadsNamed("binary-spiller-%s"))));
}

public BinarySpillerFactory(BlockEncodingSerde blockEncodingSerde, ListeningExecutorService executor)
{
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
this.executor = requireNonNull(executor, "executor is null");

SPILL_PATH.toFile().mkdirs();
}

@Override
public Spiller create(List<Type> types)
{
return new BinaryFileSpiller(blockEncodingSerde, executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import com.facebook.presto.spi.block.SortOrder;
import com.facebook.presto.spi.predicate.NullableValue;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spiller.SpillerFactory;
import com.facebook.presto.split.MappedRecordSet;
import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.split.PageSourceProvider;
Expand Down Expand Up @@ -237,6 +238,7 @@ public class LocalExecutionPlanner
private final IndexJoinLookupStats indexJoinLookupStats;
private final DataSize maxPartialAggregationMemorySize;
private final DataSize maxPagePartitioningBufferSize;
private final SpillerFactory spillerFactory;

@Inject
public LocalExecutionPlanner(
Expand All @@ -252,7 +254,8 @@ public LocalExecutionPlanner(
JoinFilterFunctionCompiler joinFilterFunctionCompiler,
IndexJoinLookupStats indexJoinLookupStats,
CompilerConfig compilerConfig,
TaskManagerConfig taskManagerConfig)
TaskManagerConfig taskManagerConfig,
SpillerFactory spillerFactory)
{
requireNonNull(compilerConfig, "compilerConfig is null");
this.queryPerformanceFetcher = requireNonNull(queryPerformanceFetcher, "queryPerformanceFetcher is null");
Expand All @@ -267,6 +270,7 @@ public LocalExecutionPlanner(
this.joinFilterFunctionCompiler = requireNonNull(joinFilterFunctionCompiler, "compiler is null");
this.indexJoinLookupStats = requireNonNull(indexJoinLookupStats, "indexJoinLookupStats is null");
this.maxIndexMemorySize = requireNonNull(taskManagerConfig, "taskManagerConfig is null").getMaxIndexMemoryUsage();
this.spillerFactory = requireNonNull(spillerFactory, "spillerFactory is null");
this.maxPartialAggregationMemorySize = taskManagerConfig.getMaxPartialAggregationMemoryUsage();
this.maxPagePartitioningBufferSize = taskManagerConfig.getMaxPagePartitioningBufferSize();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@
import com.facebook.presto.spi.block.BlockEncodingSerde;
import com.facebook.presto.spi.connector.ConnectorFactory;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spiller.BinarySpillerFactory;
import com.facebook.presto.spiller.SpillerFactory;
import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.split.SplitManager;
Expand Down Expand Up @@ -208,6 +210,7 @@ public class LocalQueryRunner
private final NodePartitioningManager nodePartitioningManager;
private final PageSinkManager pageSinkManager;
private final TransactionManager transactionManager;
private final SpillerFactory spillerFactory;

private final ExpressionCompiler expressionCompiler;
private final JoinFilterFunctionCompiler joinFilterFunctionCompiler;
Expand Down Expand Up @@ -334,6 +337,8 @@ private LocalQueryRunner(Session defaultSession, FeaturesConfig featuresConfig,
.put(Commit.class, new CommitTask())
.put(Rollback.class, new RollbackTask())
.build();

this.spillerFactory = new BinarySpillerFactory(blockEncodingSerde);
}

public static LocalQueryRunner queryRunnerWithInitialTransaction(Session defaultSession)
Expand Down Expand Up @@ -560,7 +565,8 @@ public List<Driver> createDrivers(Session session, @Language("SQL") String sql,
joinFilterFunctionCompiler,
new IndexJoinLookupStats(),
new CompilerConfig().setInterpreterEnabled(false), // make sure tests fail if compiler breaks
new TaskManagerConfig().setTaskConcurrency(4));
new TaskManagerConfig().setTaskConcurrency(4),
spillerFactory);

// plan query
LocalExecutionPlan localExecutionPlan = executionPlanner.plan(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.facebook.presto.OutputBuffers;
import com.facebook.presto.ScheduledSplit;
import com.facebook.presto.TaskSource;
import com.facebook.presto.block.BlockEncodingManager;
import com.facebook.presto.connector.ConnectorId;
import com.facebook.presto.execution.TestSqlTaskManager.MockExchangeClientSupplier;
import com.facebook.presto.execution.scheduler.LegacyNetworkTopology;
Expand All @@ -30,6 +31,7 @@
import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
import com.facebook.presto.spi.predicate.TupleDomain;
import com.facebook.presto.spi.type.Type;
import com.facebook.presto.spiller.BinarySpillerFactory;
import com.facebook.presto.split.PageSinkManager;
import com.facebook.presto.split.PageSourceManager;
import com.facebook.presto.sql.gen.ExpressionCompiler;
Expand Down Expand Up @@ -125,7 +127,8 @@ public static LocalExecutionPlanner createTestingPlanner()
new JoinFilterFunctionCompiler(metadata),
new IndexJoinLookupStats(),
new CompilerConfig(),
new TaskManagerConfig());
new TaskManagerConfig(),
new BinarySpillerFactory(new BlockEncodingManager(metadata.getTypeManager())));
}

public static TaskInfo updateTask(SqlTask sqlTask, List<TaskSource> taskSources, OutputBuffers outputBuffers)
Expand Down
Loading

0 comments on commit 71bb741

Please sign in to comment.