diff --git a/hetu-exchange-filesystem/pom.xml b/hetu-exchange-filesystem/pom.xml
new file mode 100644
index 000000000..006176423
--- /dev/null
+++ b/hetu-exchange-filesystem/pom.xml
@@ -0,0 +1,246 @@
+
+
+
+ presto-root
+ io.hetu.core
+ 1.8.0-SNAPSHOT
+
+ 4.0.0
+
+ hetu-exchange-filesystem
+ hetu-plugin
+
+
+ ${project.parent.basedir}
+ 2.17.2
+ 13.0
+ 2.0.1.Final
+ 1.10.19
+ 6.10
+ 0.8.2
+ 3.1.1
+
+
+
+
+ com.google.code.findbugs
+ jsr305
+ true
+
+
+ io.hetu.core
+ presto-main
+
+
+ io.hetu.core
+ hetu-transport
+
+
+ javax.validation
+ validation-api
+
+
+ javax.inject
+ javax.inject
+
+
+ org.checkerframework
+ checker-qual
+ 2.5.2
+
+
+ org.openjdk.jol
+ jol-core
+
+
+ commons-codec
+ commons-codec
+ runtime
+
+
+
+
+ io.hetu.core
+ presto-spi
+ provided
+
+
+ jackson-annotations
+ com.fasterxml.jackson.core
+
+
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
+ io.airlift
+ configuration
+
+
+ io.airlift
+ concurrent
+
+
+ io.airlift
+ stats
+
+
+ io.airlift
+ bootstrap
+
+
+ io.airlift
+ log
+
+
+ com.google.guava
+ guava
+
+
+ com.google.inject
+ guice
+
+
+
+ org.apache.logging.log4j
+ log4j-api
+ ${dep.log4j.version}
+ runtime
+
+
+ org.apache.logging.log4j
+ log4j-core
+ ${dep.log4j.version}
+ runtime
+
+
+ org.eclipse.jetty
+ jetty-webapp
+ 9.4.46.v20220331
+ runtime
+
+
+ org.apache.hadoop
+ hadoop-distcp
+ runtime
+
+
+ jackson-databind
+
+
+ jackson-annotations
+ com.fasterxml.jackson.core
+
+
+ com.fasterxml.jackson.core
+ runtime
+
+
+ io.prestosql.hadoop
+ hadoop-apache
+
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ org.weakref
+ jmxutils
+ ${dep.jmxutils.version}
+ compile
+
+
+
+ io.hetu.core
+ presto-tests
+ test
+
+
+ plexus-cipher
+ org.sonatype.plexus
+
+
+ plexus-classworlds
+ org.codehaus.plexus
+
+
+
+
+
+ io.hetu.core
+ hetu-metastore
+ test
+
+
+
+ io.airlift
+ testing-mysql-server
+ test
+
+
+
+ org.mockito
+ mockito-core
+ ${version.mockito-all}
+ test
+
+
+
+ io.hetu.core
+ presto-plugin-toolkit
+
+
+
+ org.apache.commons
+ commons-lang3
+
+
+
+ com.esotericsoftware
+ kryo
+
+
+ objenesis
+ org.objenesis
+
+
+
+
+
+
+
+
+ org.gaul
+ modernizer-maven-plugin
+ 1.7.1
+
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeConfig.java b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeConfig.java
new file mode 100644
index 000000000..323ff8d01
--- /dev/null
+++ b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeConfig.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.plugin.exchange.filesystem;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.configuration.Config;
+import io.airlift.configuration.ConfigDescription;
+import io.airlift.units.DataSize;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotEmpty;
+import javax.validation.constraints.NotNull;
+
+import java.net.URI;
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static io.airlift.units.DataSize.Unit.GIGABYTE;
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static io.hetu.core.plugin.exchange.filesystem.FileSystemExchangeManager.PATH_SEPARATOR;
+
+public class FileSystemExchangeConfig
+{
+ private List baseDirectories = ImmutableList.of();
+ private boolean exchangeEncryptionEnabled = true;
+
+ private DataSize maxPageStorageSize = new DataSize(16, MEGABYTE);
+ private int exchangeSinkBufferPoolMinSize = 10;
+ private int exchangeSinkBuffersPerPartition = 2;
+ private DataSize exchangeSinkMaxFileSize = new DataSize(1, GIGABYTE);
+ private int exchangeSourceConcurrentReaders = 4;
+ private int maxOutputPartitionCount = 50;
+ private int exchangeFileListingParallelism = 50;
+
+ @NotNull
+ @NotEmpty(message = "At least one base directory needs to be configured")
+ public List getBaseDirectories()
+ {
+ return baseDirectories;
+ }
+
+ @Config("exchange.base-directories")
+ @ConfigDescription("List of base directories separated by comma")
+ public FileSystemExchangeConfig setBaseDirectories(String baseDirectories)
+ {
+ if (!isNullOrEmpty(baseDirectories)) {
+ ImmutableList.Builder builder = ImmutableList.builder();
+ Arrays.stream(baseDirectories.split(",")).forEach(dir -> {
+ if (!dir.endsWith(PATH_SEPARATOR)) {
+ dir += PATH_SEPARATOR;
+ }
+ builder.add(URI.create(dir));
+ });
+ this.baseDirectories = builder.build();
+ }
+ return this;
+ }
+
+ public boolean isExchangeEncryptionEnabled()
+ {
+ return exchangeEncryptionEnabled;
+ }
+
+ @Config("exchange.encryption-enabled")
+ public FileSystemExchangeConfig setExchangeEncryptionEnabled(boolean exchangeEncryptionEnabled)
+ {
+ this.exchangeEncryptionEnabled = exchangeEncryptionEnabled;
+ return this;
+ }
+
+ public DataSize getMaxPageStorageSize()
+ {
+ return maxPageStorageSize;
+ }
+
+ @Config("exchange.max-page-storage-size")
+ @ConfigDescription("Max storage size of a page written to a sink, including the page itself and its size represented by an int")
+ public FileSystemExchangeConfig setMaxPageStorageSize(DataSize maxPageStorageSize)
+ {
+ this.maxPageStorageSize = maxPageStorageSize;
+ return this;
+ }
+
+ public int getExchangeSinkBufferPoolMinSize()
+ {
+ return exchangeSinkBufferPoolMinSize;
+ }
+
+ @Config("exchange.sink-buffer-pool-min-size")
+ public FileSystemExchangeConfig setExchangeSinkBufferPoolMinSize(int exchangeSinkBufferPoolMinSize)
+ {
+ this.exchangeSinkBufferPoolMinSize = exchangeSinkBufferPoolMinSize;
+ return this;
+ }
+
+ @Min(2)
+ public int getExchangeSinkBuffersPerPartition()
+ {
+ return exchangeSinkBuffersPerPartition;
+ }
+
+ @Config("exchange.sink-buffers-per-partition")
+ public FileSystemExchangeConfig setExchangeSinkBuffersPerPartition(int exchangeSinkBuffersPerPartition)
+ {
+ this.exchangeSinkBuffersPerPartition = exchangeSinkBuffersPerPartition;
+ return this;
+ }
+
+ public DataSize getExchangeSinkMaxFileSize()
+ {
+ return exchangeSinkMaxFileSize;
+ }
+
+ @Config("exchange.sink-max-file-size")
+ @ConfigDescription("Max size of files written by sinks")
+ public FileSystemExchangeConfig setExchangeSinkMaxFileSize(DataSize exchangeSinkMaxFileSize)
+ {
+ this.exchangeSinkMaxFileSize = exchangeSinkMaxFileSize;
+ return this;
+ }
+
+ @Min(1)
+ public int getExchangeSourceConcurrentReaders()
+ {
+ return exchangeSourceConcurrentReaders;
+ }
+
+ @Config("exchange.source-concurrent-readers")
+ public FileSystemExchangeConfig setExchangeSourceConcurrentReaders(int exchangeSourceConcurrentReaders)
+ {
+ this.exchangeSourceConcurrentReaders = exchangeSourceConcurrentReaders;
+ return this;
+ }
+
+ @Min(1)
+ public int getMaxOutputPartitionCount()
+ {
+ return maxOutputPartitionCount;
+ }
+
+ @Config("exchange.max-output-partition-count")
+ public FileSystemExchangeConfig setMaxOutputPartitionCount(int maxOutputPartitionCount)
+ {
+ this.maxOutputPartitionCount = maxOutputPartitionCount;
+ return this;
+ }
+
+ @Min(1)
+ public int getExchangeFileListingParallelism()
+ {
+ return exchangeFileListingParallelism;
+ }
+
+ @Config("exchange.file-listing-parallelism")
+ @ConfigDescription("Max parallelism of file listing calls when enumerating spooling files.")
+ public FileSystemExchangeConfig setExchangeFileListingParallelism(int exchangeFileListingParallelism)
+ {
+ this.exchangeFileListingParallelism = exchangeFileListingParallelism;
+ return this;
+ }
+}
diff --git a/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeErrorCode.java b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeErrorCode.java
new file mode 100644
index 000000000..f72cd0f55
--- /dev/null
+++ b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeErrorCode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.plugin.exchange.filesystem;
+
+import io.prestosql.spi.ErrorCode;
+import io.prestosql.spi.ErrorCodeSupplier;
+import io.prestosql.spi.ErrorType;
+
+import static io.prestosql.spi.ErrorType.USER_ERROR;
+
+public enum FileSystemExchangeErrorCode
+ implements ErrorCodeSupplier
+{
+ MAX_OUTPUT_PARTITION_COUNT_EXCEEDED(0, USER_ERROR),;
+ private final ErrorCode errorCode;
+
+ FileSystemExchangeErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0510_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeManager.java b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeManager.java
new file mode 100644
index 000000000..d7f5160fe
--- /dev/null
+++ b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeManager.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.plugin.exchange.filesystem;
+
+import com.google.common.collect.ImmutableList;
+import io.prestosql.spi.PrestoException;
+import io.prestosql.spi.exchange.Exchange;
+import io.prestosql.spi.exchange.ExchangeContext;
+import io.prestosql.spi.exchange.ExchangeManager;
+import io.prestosql.spi.exchange.ExchangeSink;
+import io.prestosql.spi.exchange.ExchangeSinkInstanceHandle;
+import io.prestosql.spi.exchange.ExchangeSource;
+import io.prestosql.spi.exchange.ExchangeSourceHandle;
+
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+import javax.inject.Inject;
+
+import java.net.URI;
+import java.security.NoSuchAlgorithmException;
+import java.util.AbstractMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static io.airlift.concurrent.Threads.daemonThreadsNamed;
+import static io.hetu.core.plugin.exchange.filesystem.FileSystemExchangeErrorCode.MAX_OUTPUT_PARTITION_COUNT_EXCEEDED;
+import static io.prestosql.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR;
+import static java.lang.Math.toIntExact;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.Executors.newCachedThreadPool;
+
+public class FileSystemExchangeManager
+ implements ExchangeManager
+{
+ public static final String PATH_SEPARATOR = "/";
+
+ private static final int KEY_BITS = 256;
+
+ private final FileSystemExchangeStorage exchangeStorage;
+ private final FileSystemExchangeStats stats;
+ private final List baseDirectories;
+ private final boolean exchangeEncryptionEnabled;
+ private final int maxPageStorageSizeInBytes;
+ private final int exchangeSinkBufferPoolMinSize;
+ private final int exchangeSinkBuffersPerPartition;
+ private final long exchangeSinkMaxFileSizeInBytes;
+ private final int exchangeSourceConcurrentReaders;
+ private final int maxOutputPartitionCount;
+ private final int exchangeFileListingParallelism;
+ private final ExecutorService executor;
+
+ @Inject
+ public FileSystemExchangeManager(
+ FileSystemExchangeStorage exchangeStorage,
+ FileSystemExchangeStats stats,
+ FileSystemExchangeConfig config)
+ {
+ requireNonNull(config, "config is null");
+ this.exchangeStorage = exchangeStorage;
+ this.stats = requireNonNull(stats, "stats is null");
+ this.baseDirectories = ImmutableList.copyOf(requireNonNull(config.getBaseDirectories(), "baseDirectories is null"));
+ this.exchangeEncryptionEnabled = config.isExchangeEncryptionEnabled();
+ this.maxPageStorageSizeInBytes = toIntExact(config.getMaxPageStorageSize().toBytes());
+ this.exchangeSinkBufferPoolMinSize = config.getExchangeSinkBufferPoolMinSize();
+ this.exchangeSinkBuffersPerPartition = config.getExchangeSinkBuffersPerPartition();
+ this.exchangeSinkMaxFileSizeInBytes = config.getExchangeSinkMaxFileSize().toBytes();
+ this.exchangeSourceConcurrentReaders = config.getExchangeSourceConcurrentReaders();
+ this.maxOutputPartitionCount = config.getMaxOutputPartitionCount();
+ this.exchangeFileListingParallelism = config.getExchangeFileListingParallelism();
+ this.executor = newCachedThreadPool(daemonThreadsNamed("exchange-source-handles-creation-%s"));
+ }
+
+ @Override
+ public Exchange createExchange(ExchangeContext context, int outputPartitionCount)
+ {
+ if (outputPartitionCount > maxOutputPartitionCount) {
+ throw new PrestoException(MAX_OUTPUT_PARTITION_COUNT_EXCEEDED,
+ format("Max output partition count exceeded for exchange %s. Allowed %s. Found: %s.", context.getExchangeId(), maxOutputPartitionCount, outputPartitionCount));
+ }
+ Optional secretKey = Optional.empty();
+ if (exchangeEncryptionEnabled) {
+ try {
+ KeyGenerator keyGenerator = KeyGenerator.getInstance("AES");
+ keyGenerator.init(KEY_BITS);
+ secretKey = Optional.of(keyGenerator.generateKey());
+ }
+ catch (NoSuchAlgorithmException e) {
+ throw new PrestoException(GENERIC_INTERNAL_ERROR, "Failed to generate secret key: " + e.getMessage(), e);
+ }
+ }
+ return new FileSystemExchange(
+ baseDirectories,
+ exchangeStorage,
+ stats,
+ context,
+ outputPartitionCount,
+ exchangeFileListingParallelism,
+ secretKey,
+ executor);
+ }
+
+ @Override
+ public ExchangeSink createSink(ExchangeSinkInstanceHandle handle, boolean preserveRecordsOrder)
+ {
+ FileSystemExchangeSinkInstanceHandle instanceHandle = (FileSystemExchangeSinkInstanceHandle) handle;
+ return new FileSystemExchangeSink(
+ exchangeStorage,
+ stats,
+ instanceHandle.getOutputDirectory(),
+ instanceHandle.getOutputPartitionCount(),
+ instanceHandle.getSinkHandle().getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES")),
+ preserveRecordsOrder,
+ maxPageStorageSizeInBytes,
+ exchangeSinkBufferPoolMinSize,
+ exchangeSinkBuffersPerPartition,
+ exchangeSinkMaxFileSizeInBytes);
+ }
+
+ @Override
+ public ExchangeSource createSource(List handles)
+ {
+ List sourceFiles = handles.stream()
+ .map(FileSystemExchangeSourceHandle.class::cast)
+ .map(handle -> {
+ Optional secretKey = handle.getSecretKey().map(key -> new SecretKeySpec(key, 0, key.length, "AES"));
+ return new AbstractMap.SimpleEntry<>(handle, secretKey);
+ })
+ .flatMap(entry -> entry.getKey().getFiles().stream().map(fileStatus ->
+ new ExchangeSourceFile(
+ URI.create(fileStatus.getFilePath()),
+ entry.getValue(),
+ fileStatus.getFileSize())))
+ .collect(toImmutableList());
+ return new FileSystemExchangeSource(
+ exchangeStorage,
+ stats,
+ sourceFiles,
+ maxPageStorageSizeInBytes,
+ exchangeSourceConcurrentReaders);
+ }
+}
diff --git a/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeModule.java b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeModule.java
new file mode 100644
index 000000000..de890ebca
--- /dev/null
+++ b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeModule.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.plugin.exchange.filesystem;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+import io.airlift.configuration.AbstractConfigurationAwareModule;
+import io.hetu.core.plugin.exchange.filesystem.hdfs.HdfsFileSystemExchangeStorage;
+import io.hetu.core.plugin.exchange.filesystem.local.LocalFileSystemExchangeStorage;
+import io.prestosql.spi.PrestoException;
+
+import java.net.URI;
+import java.util.List;
+
+import static io.prestosql.spi.StandardErrorCode.CONFIGURATION_INVALID;
+import static io.prestosql.spi.StandardErrorCode.NOT_SUPPORTED;
+import static java.lang.String.format;
+import static org.weakref.jmx.guice.ExportBinder.newExporter;
+
+public class FileSystemExchangeModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ binder.bind(FileSystemExchangeStats.class).in(Scopes.SINGLETON);
+ newExporter(binder).export(FileSystemExchangeStats.class).withGeneratedName();
+
+ List baseDirectories = buildConfigObject(FileSystemExchangeConfig.class).getBaseDirectories();
+ if (baseDirectories.stream().map(URI::getScheme).distinct().count() != 1) {
+ throw new PrestoException(CONFIGURATION_INVALID, "Multiple schemes in exchange base directories");
+ }
+ String scheme = baseDirectories.get(0).getScheme();
+ if (scheme == null || scheme.equals("file")) {
+ binder.bind(FileSystemExchangeStorage.class).to(LocalFileSystemExchangeStorage.class).in(Scopes.SINGLETON);
+ }
+ else if (ImmutableSet.of("hdfs").contains(scheme)) {
+ binder.bind(FileSystemExchangeStorage.class).to(HdfsFileSystemExchangeStorage.class).in(Scopes.SINGLETON);
+ }
+ else {
+ throw new PrestoException(NOT_SUPPORTED, format("Scheme %s is not supported as exchange storage", scheme));
+ }
+ binder.bind(FileSystemExchangeManager.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeSource.java b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeSource.java
new file mode 100644
index 000000000..edf3cbbe9
--- /dev/null
+++ b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeSource.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.plugin.exchange.filesystem;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.slice.Slice;
+import io.prestosql.spi.exchange.ExchangeSource;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.util.concurrent.Futures.nonCancellationPropagating;
+import static io.airlift.concurrent.MoreFutures.toCompletableFuture;
+import static io.airlift.concurrent.MoreFutures.whenAnyComplete;
+import static java.lang.Math.min;
+import static java.util.Objects.requireNonNull;
+
+public class FileSystemExchangeSource
+ implements ExchangeSource
+{
+ private final FileSystemExchangeStats stats;
+ private final List readers;
+ private volatile CompletableFuture> blocked;
+ private volatile boolean closed;
+
+ public FileSystemExchangeSource(
+ FileSystemExchangeStorage exchangeStorage,
+ FileSystemExchangeStats stats,
+ List sourceFiles,
+ int maxPageStorageSize,
+ int exchangeSourceConcurrentReaders)
+ {
+ requireNonNull(exchangeStorage, "exchangeStorage is null");
+ this.stats = requireNonNull(stats, "stats is null");
+ Queue sourceFileQueue = new ArrayBlockingQueue<>(sourceFiles.size());
+ sourceFileQueue.addAll(sourceFiles);
+
+ int numOfReaders = min(sourceFiles.size(), exchangeSourceConcurrentReaders);
+
+ ImmutableList.Builder exchangeReaders = ImmutableList.builder();
+ for (int i = 0; i < numOfReaders; i++) {
+ exchangeReaders.add(exchangeStorage.createExchangeReader(sourceFileQueue, maxPageStorageSize));
+ }
+ this.readers = exchangeReaders.build();
+ }
+
+ @Override
+ public CompletableFuture> isBlocked()
+ {
+ if (this.blocked != null && !this.blocked.isDone()) {
+ return this.blocked;
+ }
+ for (ExchangeStorageReader reader : readers) {
+ if (reader.isBlocked().isDone()) {
+ return NOT_BLOCKED;
+ }
+ }
+ synchronized (this) {
+ if (this.blocked == null || this.blocked.isDone()) {
+ this.blocked = stats.getExchangeSourceBlocked().record(toCompletableFuture(
+ nonCancellationPropagating(
+ whenAnyComplete(readers.stream()
+ .map(ExchangeStorageReader::isBlocked)
+ .collect(toImmutableList())))));
+ }
+ return this.blocked;
+ }
+ }
+
+ @Override
+ public boolean isFinished()
+ {
+ if (closed) {
+ return true;
+ }
+
+ for (ExchangeStorageReader reader : readers) {
+ if (!reader.isFinished()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ @Nullable
+ @Override
+ public Slice read()
+ {
+ if (closed) {
+ return null;
+ }
+
+ for (ExchangeStorageReader reader : readers) {
+ if (reader.isBlocked().isDone() && !reader.isFinished()) {
+ try {
+ return reader.read();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ @Override
+ public long getMemoryUsage()
+ {
+ long memoryUsage = 0;
+ for (ExchangeStorageReader reader : readers) {
+ memoryUsage += reader.getRetainedSize();
+ }
+ return memoryUsage;
+ }
+
+ @Override
+ public void close()
+ {
+ // Make sure we will only close once
+ synchronized (this) {
+ if (closed) {
+ return;
+ }
+ closed = true;
+ }
+
+ readers.forEach(ExchangeStorageReader::close);
+ }
+}
diff --git a/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java
new file mode 100644
index 000000000..f5ea69e7b
--- /dev/null
+++ b/hetu-exchange-filesystem/src/main/java/io/hetu/core/plugin/exchange/filesystem/FileSystemExchangeSourceHandle.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.hetu.core.plugin.exchange.filesystem;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import io.hetu.core.plugin.exchange.filesystem.util.HetuSizeOf;
+import io.prestosql.spi.exchange.ExchangeSourceHandle;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static io.hetu.core.plugin.exchange.filesystem.util.HetuSizeOf.estimatedSizeOf;
+import static io.hetu.core.plugin.exchange.filesystem.util.HetuSizeOf.sizeOf;
+import static java.util.Objects.requireNonNull;
+
+public class FileSystemExchangeSourceHandle
+ implements ExchangeSourceHandle
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(FileSystemExchangeSourceHandle.class).instanceSize();
+
+ private final int partitionId;
+ private final Optional secretKey;
+ private final List files;
+
+ @JsonCreator
+ public FileSystemExchangeSourceHandle(
+ @JsonProperty("partitionId") int partitionId,
+ @JsonProperty("files") List files,
+ @JsonProperty("secretKey") Optional secretKey)
+ {
+ this.partitionId = partitionId;
+ this.files = ImmutableList.copyOf(requireNonNull(files, "files is null"));
+ this.secretKey = requireNonNull(secretKey, "secretKey is null");
+ }
+
+ @Override
+ @JsonProperty
+ public int getPartitionId()
+ {
+ return partitionId;
+ }
+
+ @Override
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE
+ + estimatedSizeOf(files, FileStatus::getRetainedSizeInBytes)
+ + sizeOf(secretKey, HetuSizeOf::sizeOf);
+ }
+
+ @JsonProperty
+ public Optional getSecretKey()
+ {
+ return secretKey;
+ }
+
+ @JsonProperty
+ public List getFiles()
+ {
+ return files;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FileSystemExchangeSourceHandle that = (FileSystemExchangeSourceHandle) o;
+ if (secretKey.isPresent() && that.secretKey.isPresent()) {
+ return partitionId == that.getPartitionId() && Arrays.equals(secretKey.get(), that.secretKey.get());
+ }
+ else {
+ return partitionId == that.getPartitionId() && !secretKey.isPresent() && !that.secretKey.isPresent();
+ }
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(partitionId, files, secretKey);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("partitionId", partitionId)
+ .add("files", files)
+ .add("secretKey", secretKey.map(value -> "[REDACTED]"))
+ .toString();
+ }
+}
diff --git a/pom.xml b/pom.xml
index 34f1b551c..46ef38787 100644
--- a/pom.xml
+++ b/pom.xml
@@ -159,6 +159,7 @@
hetu-kylin
hetu-mpp
hetu-mariadb
+ hetu-exchange-filesystem
@@ -323,6 +324,12 @@
${project.version}
+
+ io.hetu.core
+ hetu-exchange-filesystem
+ ${project.version}
+
+
io.hetu.core
presto-local-file
diff --git a/presto-main/src/main/java/io/prestosql/exchange/ExchangeManagerModule.java b/presto-main/src/main/java/io/prestosql/exchange/ExchangeManagerModule.java
new file mode 100644
index 000000000..1522e7a2f
--- /dev/null
+++ b/presto-main/src/main/java/io/prestosql/exchange/ExchangeManagerModule.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.exchange;
+
+import com.google.inject.Binder;
+import com.google.inject.Module;
+import com.google.inject.Scopes;
+
+public class ExchangeManagerModule
+ implements Module
+{
+ @Override
+ public void configure(Binder binder)
+ {
+ binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON);
+ }
+}
diff --git a/presto-main/src/main/java/io/prestosql/exchange/ExchangeManagerRegistry.java b/presto-main/src/main/java/io/prestosql/exchange/ExchangeManagerRegistry.java
new file mode 100644
index 000000000..4c7a46a8e
--- /dev/null
+++ b/presto-main/src/main/java/io/prestosql/exchange/ExchangeManagerRegistry.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.exchange;
+
+import io.airlift.log.Logger;
+import io.prestosql.spi.classloader.ThreadContextClassLoader;
+import io.prestosql.spi.exchange.ExchangeHandleResolver;
+import io.prestosql.spi.exchange.ExchangeManager;
+import io.prestosql.spi.exchange.ExchangeManagerFactory;
+
+import javax.inject.Inject;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkState;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static io.airlift.configuration.ConfigurationLoader.loadPropertiesFrom;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class ExchangeManagerRegistry
+{
+ private static final Logger log = Logger.get(ExchangeManagerRegistry.class);
+
+ private static final File CONFIG_FILE = new File("etc/exchange-manager.properties");
+ private static final String EXCHANGE_MANAGER_NAME_PROPERTY = "exchange-manager.name";
+
+ private final ExchangeHandleResolver handleResolver;
+
+ private final Map exchangeManagerFactories = new ConcurrentHashMap<>();
+
+ private volatile ExchangeManager exchangeManager;
+
+ @Inject
+ public ExchangeManagerRegistry(ExchangeHandleResolver handleResolver)
+ {
+ this.handleResolver = requireNonNull(handleResolver, "handleResolver is null");
+ }
+
+ public void addExchangeManagerFactory(ExchangeManagerFactory factory)
+ {
+ requireNonNull(factory, "factory is null");
+ if (exchangeManagerFactories.putIfAbsent(factory.getName(), factory) != null) {
+ throw new IllegalArgumentException(format("Exchange manager factory '%s' is already registered", factory.getName()));
+ }
+ }
+
+ public void loadExchangeManager()
+ {
+ if (!CONFIG_FILE.exists()) {
+ return;
+ }
+
+ Map properties = loadProperties(CONFIG_FILE);
+ String name = properties.remove(EXCHANGE_MANAGER_NAME_PROPERTY);
+ checkArgument(!isNullOrEmpty(name), "Exchange manager configuration %s does not contain %s property", CONFIG_FILE, EXCHANGE_MANAGER_NAME_PROPERTY);
+
+ loadExchangeManager(name, properties);
+ }
+
+ public synchronized void loadExchangeManager(String name, Map properties)
+ {
+ log.info("-- Loading exchange manager %s --", name);
+
+ checkState(exchangeManager == null, "exchangeManager is already loaded");
+
+ ExchangeManagerFactory factory = exchangeManagerFactories.get(name);
+ checkArgument(factory != null, "Exchange manager factory '%s' is not registered. Available factories: %s", name, exchangeManagerFactories.keySet());
+
+ ExchangeManager exchangeManagerInstance;
+ try (ThreadContextClassLoader ignored = new ThreadContextClassLoader(factory.getClass().getClassLoader())) {
+ exchangeManagerInstance = factory.create(properties);
+ }
+ handleResolver.setExchangeManagerHandleResolver(factory.getHandleResolver());
+
+ log.info("-- Loaded exchange manager %s --", name);
+
+ this.exchangeManager = exchangeManagerInstance;
+ }
+
+ public ExchangeManager getExchangeManager()
+ {
+ ExchangeManager exchangeManagerInstance = this.exchangeManager;
+ if (exchangeManagerInstance == null) {
+ throw new RuntimeException("Exchange manager must be configured for the failure recovery capabilities to be fully functional");
+ }
+ return exchangeManagerInstance;
+ }
+
+ private static Map loadProperties(File configFile)
+ {
+ try {
+ return new HashMap<>(loadPropertiesFrom(configFile.getPath()));
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException("Failed to read configuration file: " + configFile, e);
+ }
+ }
+}
diff --git a/presto-main/src/main/java/io/prestosql/execution/Lifespan.java b/presto-main/src/main/java/io/prestosql/execution/Lifespan.java
index ac108db6c..4dfe7c502 100644
--- a/presto-main/src/main/java/io/prestosql/execution/Lifespan.java
+++ b/presto-main/src/main/java/io/prestosql/execution/Lifespan.java
@@ -16,6 +16,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
+import org.openjdk.jol.info.ClassLayout;
import java.util.Objects;
@@ -25,6 +26,8 @@
public class Lifespan
{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(Lifespan.class).instanceSize();
+
private static final Lifespan TASK_WIDE = new Lifespan(false, 0);
private final boolean grouped;
@@ -92,4 +95,9 @@ public int hashCode()
{
return Objects.hash(grouped, groupId);
}
+
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE;
+ }
}
diff --git a/presto-main/src/main/java/io/prestosql/server/PluginManager.java b/presto-main/src/main/java/io/prestosql/server/PluginManager.java
index 3f9ff137e..545a8e6f1 100644
--- a/presto-main/src/main/java/io/prestosql/server/PluginManager.java
+++ b/presto-main/src/main/java/io/prestosql/server/PluginManager.java
@@ -22,6 +22,7 @@
import io.prestosql.connector.ConnectorManager;
import io.prestosql.cube.CubeManager;
import io.prestosql.eventlistener.EventListenerManager;
+import io.prestosql.exchange.ExchangeManagerRegistry;
import io.prestosql.execution.resourcegroups.ResourceGroupManager;
import io.prestosql.failuredetector.FailureDetectorManager;
import io.prestosql.failuredetector.FailureDetectorPlugin;
@@ -40,6 +41,7 @@
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.cube.CubeProvider;
import io.prestosql.spi.eventlistener.EventListenerFactory;
+import io.prestosql.spi.exchange.ExchangeManagerFactory;
import io.prestosql.spi.failuredetector.FailureRetryFactory;
import io.prestosql.spi.filesystem.HetuFileSystemClientFactory;
import io.prestosql.spi.function.FunctionNamespaceManagerFactory;
@@ -115,6 +117,9 @@ public class PluginManager
private final FileSystemClientManager fileSystemClientManager;
private final FailureDetectorManager failureDetectorManager;
private final HeuristicIndexerManager heuristicIndexerManager;
+
+ private final ExchangeManagerRegistry exchangeManagerRegistry;
+
private final SessionPropertyDefaults sessionPropertyDefaults;
private final ArtifactResolver resolver;
private final File installedPluginsDir;
@@ -142,7 +147,8 @@ public PluginManager(
FileSystemClientManager fileSystemClientManager,
HetuMetaStoreManager hetuMetaStoreManager,
HeuristicIndexerManager heuristicIndexerManager,
- FailureDetectorManager failureDetectorManager)
+ FailureDetectorManager failureDetectorManager,
+ ExchangeManagerRegistry exchangeManagerRegistry)
{
requireNonNull(nodeInfo, "nodeInfo is null");
requireNonNull(config, "config is null");
@@ -175,6 +181,26 @@ public PluginManager(
this.hetuMetaStoreManager = requireNonNull(hetuMetaStoreManager, "hetuMetaStoreManager is null");
this.heuristicIndexerManager = requireNonNull(heuristicIndexerManager, "heuristicIndexerManager is null");
this.failureDetectorManager = requireNonNull(failureDetectorManager, "failureDetectorManager is null");
+ this.exchangeManagerRegistry = requireNonNull(exchangeManagerRegistry, "exchangeManagerRegistry is null");
+ }
+
+ private static List listFiles(File installedPluginsDir)
+ {
+ if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
+ File[] files = installedPluginsDir.listFiles();
+ if (files != null) {
+ Arrays.sort(files);
+ return ImmutableList.copyOf(files);
+ }
+ }
+ return ImmutableList.of();
+ }
+
+ private static List sortedArtifacts(List artifacts)
+ {
+ List list = new ArrayList<>(artifacts);
+ Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile));
+ return list;
}
public void loadPlugins()
@@ -357,6 +383,11 @@ public void installPlugin(Plugin plugin)
FailureDetectorManager.addFailureRetryFactory(failureRetryFactory);
}
+ for (ExchangeManagerFactory exchangeManagerFactory : plugin.getExchangeManagerFactories()) {
+ log.info("Registering exchange manager %s", exchangeManagerFactory.getName());
+ exchangeManagerRegistry.addExchangeManagerFactory(exchangeManagerFactory);
+ }
+
installFunctionsPlugin(plugin);
}
@@ -429,23 +460,4 @@ private URLClassLoader createClassLoader(List urls)
ClassLoader parent = getClass().getClassLoader();
return new PluginClassLoader(urls, parent, SPI_PACKAGES);
}
-
- private static List listFiles(File installedPluginsDir)
- {
- if (installedPluginsDir != null && installedPluginsDir.isDirectory()) {
- File[] files = installedPluginsDir.listFiles();
- if (files != null) {
- Arrays.sort(files);
- return ImmutableList.copyOf(files);
- }
- }
- return ImmutableList.of();
- }
-
- private static List sortedArtifacts(List artifacts)
- {
- List list = new ArrayList<>(artifacts);
- Collections.sort(list, Ordering.natural().nullsLast().onResultOf(Artifact::getFile));
- return list;
- }
}
diff --git a/presto-main/src/main/java/io/prestosql/server/PrestoServer.java b/presto-main/src/main/java/io/prestosql/server/PrestoServer.java
index 96d584f34..7c5db318c 100755
--- a/presto-main/src/main/java/io/prestosql/server/PrestoServer.java
+++ b/presto-main/src/main/java/io/prestosql/server/PrestoServer.java
@@ -40,6 +40,8 @@
import io.prestosql.dynamicfilter.DynamicFilterListener;
import io.prestosql.eventlistener.EventListenerManager;
import io.prestosql.eventlistener.EventListenerModule;
+import io.prestosql.exchange.ExchangeManagerModule;
+import io.prestosql.exchange.ExchangeManagerRegistry;
import io.prestosql.execution.resourcegroups.ResourceGroupManager;
import io.prestosql.execution.scheduler.NodeSchedulerConfig;
import io.prestosql.execution.warnings.WarningCollectorModule;
@@ -131,7 +133,8 @@ public void run()
new EventListenerModule(),
new ServerMainModule(sqlParserOptions),
new NodeStateChangeModule(),
- new WarningCollectorModule());
+ new WarningCollectorModule(),
+ new ExchangeManagerModule());
modules.addAll(getAdditionalModules());
@@ -194,6 +197,8 @@ public void run()
injector.getInstance(ServerInfoResource.class).startupComplete();
+ injector.getInstance(ExchangeManagerRegistry.class).loadExchangeManager();
+
log.info("======== SERVER STARTED ========");
}
catch (Throwable e) {
diff --git a/presto-main/src/main/java/io/prestosql/server/ServerMainModule.java b/presto-main/src/main/java/io/prestosql/server/ServerMainModule.java
index 9bb7cd937..a349f517f 100644
--- a/presto-main/src/main/java/io/prestosql/server/ServerMainModule.java
+++ b/presto-main/src/main/java/io/prestosql/server/ServerMainModule.java
@@ -58,6 +58,7 @@
import io.prestosql.execution.QueryManagerConfig;
import io.prestosql.execution.SqlTaskManager;
import io.prestosql.execution.StageInfo;
+import io.prestosql.execution.TableExecuteContextManager;
import io.prestosql.execution.TaskInfo;
import io.prestosql.execution.TaskManagementExecutor;
import io.prestosql.execution.TaskManager;
@@ -344,6 +345,7 @@ protected void setup(Binder binder)
binder.bind(TaskManagementExecutor.class).in(Scopes.SINGLETON);
binder.bind(SqlTaskManager.class).in(Scopes.SINGLETON);
binder.bind(TaskManager.class).to(Key.get(SqlTaskManager.class));
+ binder.bind(TableExecuteContextManager.class).in(Scopes.SINGLETON);
// memory revoking scheduler
binder.bind(MemoryRevokingScheduler.class).in(Scopes.SINGLETON);
diff --git a/presto-main/src/test/java/io/prestosql/exchange/ExchangeManagerRegistryTest.java b/presto-main/src/test/java/io/prestosql/exchange/ExchangeManagerRegistryTest.java
new file mode 100644
index 000000000..a458974db
--- /dev/null
+++ b/presto-main/src/test/java/io/prestosql/exchange/ExchangeManagerRegistryTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.exchange;
+
+import io.airlift.log.Logger;
+import io.prestosql.server.testing.TestingPrestoServer;
+import io.prestosql.spi.exchange.ExchangeHandleResolver;
+import io.prestosql.spi.exchange.ExchangeManager;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import static io.airlift.configuration.ConfigurationLoader.loadPropertiesFrom;
+import static org.testng.Assert.assertNotNull;
+
+public class ExchangeManagerRegistryTest
+{
+ private static final Logger LOG = Logger.get(ExchangeManagerRegistryTest.class);
+ private static final File CONFIG_FILE = new File("etc/exchange-manager.properties");
+
+ @Test
+ public void testLoadExchangeManager() throws IOException
+ {
+ Map properties = loadPropertiesFrom(CONFIG_FILE.getPath());
+ try (TestingPrestoServer server = new TestingPrestoServer(properties)) {
+ ExchangeManagerRegistry exchangeManagerRegistry = new ExchangeManagerRegistry(new ExchangeHandleResolver());
+ exchangeManagerRegistry.loadExchangeManager();
+ ExchangeManager exchangeManager = exchangeManagerRegistry.getExchangeManager();
+ assertNotNull(exchangeManager);
+ }
+ catch (Exception e) {
+ LOG.info(e.getMessage());
+ }
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/EstimateSizeUtil.java b/presto-spi/src/main/java/io/prestosql/spi/EstimateSizeUtil.java
new file mode 100644
index 000000000..75d7e4a74
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/EstimateSizeUtil.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi;
+
+import org.openjdk.jol.info.ClassLayout;
+import sun.misc.Unsafe;
+
+import java.util.AbstractMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.ToLongFunction;
+
+public class EstimateSizeUtil
+{
+ private static final int SIMPLE_ENTRY_INSTANCE_SIZE = ClassLayout.parseClass(AbstractMap.SimpleEntry.class).instanceSize();
+ private static final int STRING_INSTANCE_SIZE = ClassLayout.parseClass(String.class).instanceSize();
+ private static final int OPTIONAL_INSTANCE_SIZE = ClassLayout.parseClass(Optional.class).instanceSize();
+
+ private EstimateSizeUtil()
+ {
+ }
+
+ public static long estimatedSizeOf(Map map, ToLongFunction keySize, ToLongFunction valueSize)
+ {
+ if (map == null) {
+ return 0L;
+ }
+ else {
+ long result = sizeOfObjectArray(map.size());
+
+ Map.Entry entry;
+ for (Iterator var5 = map.entrySet().iterator(); var5.hasNext(); result += (long) SIMPLE_ENTRY_INSTANCE_SIZE + keySize.applyAsLong((K) entry.getKey()) + valueSize.applyAsLong((V) entry.getValue())) {
+ entry = (Map.Entry) var5.next();
+ }
+
+ return result;
+ }
+ }
+
+ public static long sizeOfObjectArray(int length)
+ {
+ return (long) Unsafe.ARRAY_OBJECT_BASE_OFFSET + (long) Unsafe.ARRAY_OBJECT_INDEX_SCALE * (long) length;
+ }
+
+ public static long sizeOf(Optional optional, ToLongFunction valueSize)
+ {
+ return optional != null && optional.isPresent() ? (long) OPTIONAL_INSTANCE_SIZE + valueSize.applyAsLong(optional.get()) : 0L;
+ }
+
+ public static long estimatedSizeOf(List list, ToLongFunction valueSize)
+ {
+ if (list == null) {
+ return 0L;
+ }
+ else {
+ long result = sizeOfObjectArray(list.size());
+
+ Object value;
+ for (Iterator var4 = list.iterator(); var4.hasNext(); result += valueSize.applyAsLong((T) value)) {
+ value = var4.next();
+ }
+
+ return result;
+ }
+ }
+
+ public static long estimatedSizeOf(String string)
+ {
+ return string == null ? 0L : (long) (STRING_INSTANCE_SIZE + string.length() * 2);
+ }
+
+ public static long estimatedSizeOf(Set set, ToLongFunction valueSize)
+ {
+ if (set == null) {
+ return 0L;
+ }
+ else {
+ long result = sizeOfObjectArray(set.size());
+
+ Object value;
+ for (Iterator var4 = set.iterator(); var4.hasNext(); result += (long) SIMPLE_ENTRY_INSTANCE_SIZE + valueSize.applyAsLong((T) value)) {
+ value = var4.next();
+ }
+
+ return result;
+ }
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/HostAddress.java b/presto-spi/src/main/java/io/prestosql/spi/HostAddress.java
index ecf132aa5..4932a4723 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/HostAddress.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/HostAddress.java
@@ -15,6 +15,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
+import org.openjdk.jol.info.ClassLayout;
import java.net.InetAddress;
import java.net.URI;
@@ -23,6 +24,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static io.prestosql.spi.EstimateSizeUtil.estimatedSizeOf;
import static java.util.Objects.requireNonNull;
/**
@@ -58,6 +60,8 @@
*/
public class HostAddress
{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(HostAddress.class).instanceSize();
+
/**
* Magic value indicating the absence of a port number.
*/
@@ -294,4 +298,9 @@ private static boolean isValidPort(int port)
{
return port >= 0 && port <= 65535;
}
+
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE + estimatedSizeOf(host);
+ }
}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/Plugin.java b/presto-spi/src/main/java/io/prestosql/spi/Plugin.java
index d9d8c69f3..dd1809ced 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/Plugin.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/Plugin.java
@@ -17,6 +17,7 @@
import io.prestosql.spi.connector.ConnectorFactory;
import io.prestosql.spi.cube.CubeProvider;
import io.prestosql.spi.eventlistener.EventListenerFactory;
+import io.prestosql.spi.exchange.ExchangeManagerFactory;
import io.prestosql.spi.failuredetector.FailureRetryFactory;
import io.prestosql.spi.filesystem.HetuFileSystemClientFactory;
import io.prestosql.spi.function.FunctionNamespaceManagerFactory;
@@ -149,19 +150,28 @@ default Optional getConnectorWithProperties()
}
default void setExternalFunctionsDir(File externalFuncsDir)
- {}
+ {
+ }
default void setMaxFunctionRunningTimeEnable(boolean enable)
- {}
+ {
+ }
default void setMaxFunctionRunningTimeInSec(long time)
- {}
+ {
+ }
default void setFunctionRunningThreadPoolSize(int size)
- {}
+ {
+ }
default Iterable getFunctionNamespaceManagerFactories()
{
return emptyList();
}
+
+ default Iterable getExchangeManagerFactories()
+ {
+ return emptyList();
+ }
}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/SplitWeight.java b/presto-spi/src/main/java/io/prestosql/spi/SplitWeight.java
new file mode 100644
index 000000000..e269be218
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/SplitWeight.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+import org.openjdk.jol.info.ClassLayout;
+
+import java.math.BigDecimal;
+import java.util.Collection;
+import java.util.function.Function;
+
+import static java.lang.Math.addExact;
+import static java.lang.Math.multiplyExact;
+
+public class SplitWeight
+{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(SplitWeight.class).instanceSize();
+
+ private static final long UNIT_VALUE = 100;
+ private static final int UNIT_SCALE = 2; // Decimal scale such that (10 ^ UNIT_SCALE) == UNIT_VALUE
+ private static final SplitWeight STANDARD_WEIGHT = new SplitWeight(UNIT_VALUE);
+
+ private final long value;
+
+ private SplitWeight(long value)
+ {
+ if (value <= 0) {
+ throw new IllegalArgumentException("value must be > 0, found: " + value);
+ }
+ this.value = value;
+ }
+
+ /**
+ * @return The internal integer representation for this weight value
+ */
+ @JsonValue
+ public long getRawValue()
+ {
+ return value;
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ if (!(other instanceof SplitWeight)) {
+ return false;
+ }
+ return this.value == ((SplitWeight) other).value;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Long.hashCode(value);
+ }
+
+ @Override
+ public String toString()
+ {
+ if (value == UNIT_VALUE) {
+ return "1";
+ }
+ return BigDecimal.valueOf(value, -UNIT_SCALE).stripTrailingZeros().toPlainString();
+ }
+
+ /**
+ * Produces a {@link SplitWeight} from the raw internal value representation. This method is intended
+ * primarily for JSON deserialization, and connectors should use not call this factory method directly
+ * to construct {@link SplitWeight} instances. Instead, connectors should use {@link SplitWeight#fromProportion(double)}
+ * to avoid breakages that could arise if {@link SplitWeight#UNIT_VALUE} changes in the future.
+ */
+ @JsonCreator
+ public static SplitWeight fromRawValue(long value)
+ {
+ return value == UNIT_VALUE ? STANDARD_WEIGHT : new SplitWeight(value);
+ }
+
+ /**
+ * Produces a {@link SplitWeight} that corresponds to the {@link SplitWeight#standard()} weight
+ * proportionally, i.e., a parameter of 1.0
will be equivalent to the standard weight
+ * and a value of 0.5
will be 1/2 of the standard split weight. Valid arguments
+ * must be greater than zero and finite. Connectors should prefer constructing split weights
+ * using this factory method rather than passing a raw integer value in case the integer representation
+ * of a standard split needs to change in the future.
+ *
+ * @param weight the proportional weight relative to a standard split, expressed as a double
+ * @return a {@link SplitWeight} with a raw value corresponding to the requested proportion
+ */
+ public static SplitWeight fromProportion(double weight)
+ {
+ if (weight <= 0 || !Double.isFinite(weight)) {
+ throw new IllegalArgumentException("Invalid weight: " + weight);
+ }
+ // Must round up to avoid small weights rounding to 0
+ return fromRawValue((long) Math.ceil(weight * UNIT_VALUE));
+ }
+
+ public static SplitWeight standard()
+ {
+ return STANDARD_WEIGHT;
+ }
+
+ public static long rawValueForStandardSplitCount(int splitCount)
+ {
+ if (splitCount < 0) {
+ throw new IllegalArgumentException("splitCount must be >= 0, found: " + splitCount);
+ }
+ return multiplyExact(splitCount, UNIT_VALUE);
+ }
+
+ public static long rawValueSum(Collection collection, Function getter)
+ {
+ long sum = 0;
+ for (T item : collection) {
+ long rawValue = getter.apply(item).getRawValue();
+ sum = addExact(sum, rawValue);
+ }
+ return sum;
+ }
+
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE;
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/StandardErrorCode.java b/presto-spi/src/main/java/io/prestosql/spi/StandardErrorCode.java
index 715115d1b..be6ce9660 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/StandardErrorCode.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/StandardErrorCode.java
@@ -99,7 +99,7 @@ public enum StandardErrorCode
SEED_STORE_FAILURE(0x0001_001B, INTERNAL_ERROR),
TOO_MANY_RESUMES(0x0001_001C, INTERNAL_ERROR),
QUERY_EXPIRE(0x0001_001D, INTERNAL_ERROR),
-
+ REMOTE_TASK_FAILED(0x0001_001E, INTERNAL_ERROR),
GENERIC_INSUFFICIENT_RESOURCES(0x0002_0000, INSUFFICIENT_RESOURCES),
EXCEEDED_GLOBAL_MEMORY_LIMIT(0x0002_0001, INSUFFICIENT_RESOURCES),
QUERY_QUEUE_FULL(0x0002_0002, INSUFFICIENT_RESOURCES),
@@ -109,8 +109,9 @@ public enum StandardErrorCode
EXCEEDED_SPILL_LIMIT(0x0002_0006, INSUFFICIENT_RESOURCES),
EXCEEDED_LOCAL_MEMORY_LIMIT(0x0002_0007, INSUFFICIENT_RESOURCES),
ADMINISTRATIVELY_PREEMPTED(0x0002_0008, INSUFFICIENT_RESOURCES),
+ EXCEEDED_TASK_DESCRIPTOR_STORAGE_CAPACITY(0x0002_0009, INSUFFICIENT_RESOURCES),
- CUBE_ERROR(0x0002_0009, INTERNAL_ERROR)
+ CUBE_ERROR(0x0002_000A, INTERNAL_ERROR)
/**/;
// Connectors can use error codes starting at the range 0x0100_0000
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/CatalogName.java b/presto-spi/src/main/java/io/prestosql/spi/connector/CatalogName.java
index 8d8a12e31..935e03291 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/CatalogName.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/connector/CatalogName.java
@@ -15,14 +15,18 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
+import org.openjdk.jol.info.ClassLayout;
import java.util.Objects;
import static com.google.common.base.Preconditions.checkArgument;
+import static io.prestosql.spi.EstimateSizeUtil.estimatedSizeOf;
import static java.util.Objects.requireNonNull;
public final class CatalogName
{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(CatalogName.class).instanceSize();
+
private static final String INFORMATION_SCHEMA_CONNECTOR_PREFIX = "$info_schema@";
private static final String SYSTEM_TABLES_CONNECTOR_PREFIX = "$system@";
@@ -81,4 +85,10 @@ public static CatalogName createSystemTablesCatalogName(CatalogName catalogName)
{
return new CatalogName(SYSTEM_TABLES_CONNECTOR_PREFIX + catalogName.getCatalogName());
}
+
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE
+ + estimatedSizeOf(catalogName);
+ }
}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplit.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplit.java
index 3cf1eb8c1..684b362a0 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplit.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplit.java
@@ -14,6 +14,7 @@
package io.prestosql.spi.connector;
import io.prestosql.spi.HostAddress;
+import io.prestosql.spi.SplitWeight;
import sun.reflect.generics.reflectiveObjects.NotImplementedException;
import java.util.List;
@@ -61,8 +62,18 @@ default int getSplitCount()
return 1;
}
+ default SplitWeight getSplitWeight()
+ {
+ return SplitWeight.standard();
+ }
+
default List getUnwrappedSplits()
{
throw new NotImplementedException();
}
+
+ default long getRetainedSizeInBytes()
+ {
+ return 0;
+ }
}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitSource.java b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitSource.java
index 91bbe3024..818295dc4 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitSource.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/connector/ConnectorSplitSource.java
@@ -15,6 +15,7 @@
import java.io.Closeable;
import java.util.List;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import static java.util.Objects.requireNonNull;
@@ -63,4 +64,9 @@ default List groupSmallSplits(List pendingSplits
{
return pendingSplits;
}
+
+ default Optional> getTableExecuteSplitsInfo()
+ {
+ return Optional.empty();
+ }
}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/Exchange.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/Exchange.java
new file mode 100644
index 000000000..63a89b903
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/Exchange.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import java.io.Closeable;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public interface Exchange
+ extends Closeable
+{
+ /**
+ * Add a new sink
+ *
+ * @param taskPartitionId unique partition written to a sink
+ * @return {@link ExchangeSinkHandle} associated with the taskPartitionId
+ */
+ ExchangeSinkHandle addSink(int taskPartitionId);
+
+ /**
+ * Called when no more sinks will be added with {@link #addSink(int)}
+ */
+ void noMoreSinks();
+
+ /**
+ * Registers a sink instance for a task attempt.
+ *
+ * @param sinkHandle - handle returned by {@link #addSink(int)}
+ * @param taskAttemptId - attempt id (how many times attempted)
+ * @return ExchangeSinkInstanceHandle to be sent to a worker that is needed to create an {@link ExchangeSink} instance
+ * with {@link ExchangeManager#createSink(ExchangeSinkInstanceHandle, boolean)}
+ */
+ ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId);
+
+ void sinkFinished(ExchangeSinkInstanceHandle handle);
+
+ CompletableFuture> getSourceHandles();
+
+ ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes);
+
+ ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle handle);
+
+ @Override
+ void close();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeContext.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeContext.java
new file mode 100644
index 000000000..d7f1456d0
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeContext.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import io.prestosql.spi.QueryId;
+
+public class ExchangeContext
+{
+ private final QueryId queryId;
+ private final ExchangeId exchangeId;
+
+ public ExchangeContext(QueryId queryId, ExchangeId exchangeId)
+ {
+ this.queryId = queryId;
+ this.exchangeId = exchangeId;
+ }
+
+ public QueryId getQueryId()
+ {
+ return queryId;
+ }
+
+ public ExchangeId getExchangeId()
+ {
+ return exchangeId;
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeHandleResolver.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeHandleResolver.java
new file mode 100644
index 000000000..660e46317
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeHandleResolver.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkState;
+
+public class ExchangeHandleResolver
+{
+ private final AtomicReference exchangeManagerHandleResolver = new AtomicReference<>();
+
+ public void setExchangeManagerHandleResolver(ExchangeManagerHandleResolver resolver)
+ {
+ checkState(exchangeManagerHandleResolver.compareAndSet(null, resolver), "ExchangeManagerHandleResolver is already set");
+ }
+
+ public Class extends ExchangeSinkInstanceHandle> getExchangeSinkInstanceHandleClass()
+ {
+ ExchangeManagerHandleResolver resolver = exchangeManagerHandleResolver.get();
+ checkState(resolver != null, "ExchangeManagerHandleResolver is not set");
+ return resolver.getExchangeSinkInstanceHandleClass();
+ }
+
+ public Class extends ExchangeSourceHandle> getExchangeSourceHandleClass()
+ {
+ ExchangeManagerHandleResolver resolver = exchangeManagerHandleResolver.get();
+ checkState(resolver != null, "ExchangeManagerHandleResolver is not set");
+ return resolver.getExchangeSourceHandleClass();
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeId.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeId.java
new file mode 100644
index 000000000..f9d9bb05a
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeId.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import java.util.Objects;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static java.util.UUID.randomUUID;
+
+public class ExchangeId
+{
+ private static final Pattern ID_PATTERN = Pattern.compile("[a-zA-Z\\d_-]+");
+
+ private final String id;
+
+ @JsonCreator
+ public ExchangeId(String id)
+ {
+ requireNonNull(id, "id is null");
+ if (!ID_PATTERN.matcher(id).matches()) {
+ throw new IllegalArgumentException("Invalid exchange id: " + id);
+ }
+ this.id = id;
+ }
+
+ public static ExchangeId createRandomExchangeId()
+ {
+ return new ExchangeId(randomUUID().toString());
+ }
+
+ @JsonValue
+ public String getId()
+ {
+ return id;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ExchangeId that = (ExchangeId) o;
+ return Objects.equals(id, that.id);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(id);
+ }
+
+ @Override
+ public String toString()
+ {
+ return id;
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManager.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManager.java
new file mode 100644
index 000000000..76c8c38b5
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManager.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.util.List;
+
+/**
+ * Service provider interface for an external exchange
+ * It's used to exchange data between stages
+ */
+@ThreadSafe
+public interface ExchangeManager
+{
+ /**
+ * create an external exchange between a pair of stages
+ *
+ * @param context information about the query and stage being executed
+ * @param outputPartitionCount number of distinct partitions to be created by the exchange
+ * @return {@link Exchange} instance to be used by coordinator to interact with the external exchange
+ */
+ Exchange createExchange(ExchangeContext context, int outputPartitionCount);
+
+ ExchangeSink createSink(ExchangeSinkInstanceHandle handle, boolean preserveRecordsOrder);
+
+ ExchangeSource createSource(List handles);
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManagerFactory.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManagerFactory.java
new file mode 100644
index 000000000..7e186fba3
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManagerFactory.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import java.util.Map;
+
+public interface ExchangeManagerFactory
+{
+ String getName();
+
+ ExchangeManager create(Map config);
+
+ ExchangeManagerHandleResolver getHandleResolver();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManagerHandleResolver.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManagerHandleResolver.java
new file mode 100644
index 000000000..faa4fc407
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeManagerHandleResolver.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public interface ExchangeManagerHandleResolver
+{
+ Class extends ExchangeSinkInstanceHandle> getExchangeSinkInstanceHandleClass();
+
+ Class extends ExchangeSourceHandle> getExchangeSourceHandleClass();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSink.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSink.java
new file mode 100644
index 000000000..ff849d85f
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSink.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import io.airlift.slice.Slice;
+
+import java.util.concurrent.CompletableFuture;
+
+public interface ExchangeSink
+{
+ CompletableFuture NOT_BLOCKED = CompletableFuture.completedFuture(null);
+
+ CompletableFuture isBlocked();
+
+ void add(int partitionId, Slice page);
+
+ long getMemoryUsage();
+
+ CompletableFuture finish();
+
+ CompletableFuture abort();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSinkHandle.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSinkHandle.java
new file mode 100644
index 000000000..3cb7eeffc
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSinkHandle.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public interface ExchangeSinkHandle
+{
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSinkInstanceHandle.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSinkInstanceHandle.java
new file mode 100644
index 000000000..67df5c9af
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSinkInstanceHandle.java
@@ -0,0 +1,18 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public interface ExchangeSinkInstanceHandle
+{
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSource.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSource.java
new file mode 100644
index 000000000..f4f51dc16
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSource.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import io.airlift.slice.Slice;
+
+import javax.annotation.Nullable;
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.Closeable;
+import java.util.concurrent.CompletableFuture;
+
+@ThreadSafe
+public interface ExchangeSource
+ extends Closeable
+{
+ CompletableFuture NOT_BLOCKED = CompletableFuture.completedFuture(null);
+
+ CompletableFuture> isBlocked();
+
+ boolean isFinished();
+
+ @Nullable
+ Slice read();
+
+ long getMemoryUsage();
+
+ @Override
+ void close();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceHandle.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceHandle.java
new file mode 100644
index 000000000..20471211e
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceHandle.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public interface ExchangeSourceHandle
+{
+ int getPartitionId();
+
+ long getRetainedSizeInBytes();
+
+ @Override
+ boolean equals(Object obj);
+
+ @Override
+ int hashCode();
+
+ @Override
+ String toString();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceSplitter.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceSplitter.java
new file mode 100644
index 000000000..6803634f4
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceSplitter.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import java.io.Closeable;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+
+public interface ExchangeSourceSplitter
+ extends Closeable
+{
+ /**
+ * Returns a future that will be completed when the splitter becomes unblocked.
+ */
+ CompletableFuture isBlocked();
+
+ /**
+ * Returns next sub partition, or {@link Optional#empty()} if the splitting process is finished.
+ */
+ Optional getNext();
+
+ @Override
+ void close();
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceStatistics.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceStatistics.java
new file mode 100644
index 000000000..fffdce130
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/ExchangeSourceStatistics.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public class ExchangeSourceStatistics
+{
+ private final long sizeInBytes;
+
+ public ExchangeSourceStatistics(long sizeInBytes)
+ {
+ this.sizeInBytes = sizeInBytes;
+ }
+
+ public long getSizeInBytes()
+ {
+ return sizeInBytes;
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/RetryMode.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/RetryMode.java
new file mode 100644
index 000000000..c2feb844b
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/RetryMode.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public enum RetryMode
+{
+ NO_RETRIES,
+ RETRIES_ENABLED,
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/exchange/RetryPolicy.java b/presto-spi/src/main/java/io/prestosql/spi/exchange/RetryPolicy.java
new file mode 100644
index 000000000..55c248f6e
--- /dev/null
+++ b/presto-spi/src/main/java/io/prestosql/spi/exchange/RetryPolicy.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+public enum RetryPolicy
+{
+ TASK(RetryMode.RETRIES_ENABLED),
+ NONE(RetryMode.NO_RETRIES),
+ /**/;
+
+ private final RetryMode retryMode;
+
+ RetryPolicy(RetryMode retryMode)
+ {
+ this.retryMode = retryMode;
+ }
+
+ public RetryMode getRetryMode()
+ {
+ return this.retryMode;
+ }
+}
diff --git a/presto-spi/src/main/java/io/prestosql/spi/plan/PlanNodeId.java b/presto-spi/src/main/java/io/prestosql/spi/plan/PlanNodeId.java
index 15e4f45f2..5937fd97a 100644
--- a/presto-spi/src/main/java/io/prestosql/spi/plan/PlanNodeId.java
+++ b/presto-spi/src/main/java/io/prestosql/spi/plan/PlanNodeId.java
@@ -15,14 +15,17 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;
+import org.openjdk.jol.info.ClassLayout;
import javax.annotation.concurrent.Immutable;
+import static io.prestosql.spi.EstimateSizeUtil.estimatedSizeOf;
import static java.util.Objects.requireNonNull;
@Immutable
public class PlanNodeId
{
+ private static final int INSTANCE_SIZE = ClassLayout.parseClass(PlanNodeId.class).instanceSize();
private final String id;
@JsonCreator
@@ -63,4 +66,10 @@ public int hashCode()
{
return id.hashCode();
}
+
+ public long getRetainedSizeInBytes()
+ {
+ return INSTANCE_SIZE
+ + estimatedSizeOf(id);
+ }
}
diff --git a/presto-spi/src/test/java/io/prestosql/spi/exchange/TestExchangeManager.java b/presto-spi/src/test/java/io/prestosql/spi/exchange/TestExchangeManager.java
new file mode 100644
index 000000000..126d75ac7
--- /dev/null
+++ b/presto-spi/src/test/java/io/prestosql/spi/exchange/TestExchangeManager.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.prestosql.spi.exchange;
+
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+public abstract class TestExchangeManager
+{
+ public static ExchangeManager createExchangeManager()
+ {
+ return new ExchangeManager()
+ {
+ @Override
+ public Exchange createExchange(ExchangeContext context, int outputPartitionCount)
+ {
+ return new Exchange()
+ {
+ @Override
+ public ExchangeSinkHandle addSink(int taskPartitionId)
+ {
+ return null;
+ }
+
+ @Override
+ public void noMoreSinks()
+ {
+ }
+
+ @Override
+ public ExchangeSinkInstanceHandle instantiateSink(ExchangeSinkHandle sinkHandle, int taskAttemptId)
+ {
+ return null;
+ }
+
+ @Override
+ public void sinkFinished(ExchangeSinkInstanceHandle handle)
+ {
+ }
+
+ @Override
+ public CompletableFuture> getSourceHandles()
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeSourceSplitter split(ExchangeSourceHandle handle, long targetSizeInBytes)
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeSourceStatistics getExchangeSourceStatistics(ExchangeSourceHandle handle)
+ {
+ return null;
+ }
+
+ @Override
+ public void close()
+ {
+ }
+ };
+ }
+
+ @Override
+ public ExchangeSink createSink(ExchangeSinkInstanceHandle handle, boolean preserveRecordsOrder)
+ {
+ return null;
+ }
+
+ @Override
+ public ExchangeSource createSource(List handles)
+ {
+ return null;
+ }
+ };
+ }
+}