From 98150a8a55bd19d5d53052b5aed2ed094b3b0940 Mon Sep 17 00:00:00 2001 From: Surya Sumanth N Date: Thu, 9 Feb 2023 17:30:00 +0530 Subject: [PATCH] Hybrid Exchange Buffer --- .../filesystem/HetuHdfsFileSystemClient.java | 10 + .../filesystem/HetuLocalFileSystemClient.java | 7 + .../io/prestosql/exchange/ExchangeSink.java | 15 ++ .../exchange/FileSystemExchangeManager.java | 6 +- .../exchange/FileSystemExchangeSink.java | 37 ++- .../FileSystemExchangeSinkInstanceHandle.java | 6 + .../io/prestosql/exchange/RetryPolicy.java | 1 + .../storage/HetuFileSystemExchangeWriter.java | 25 +- .../execution/SqlStageExecution.java | 6 +- .../buffer/HybridSpoolingBuffer.java | 210 +++++++++++++++ .../execution/buffer/LazyOutputBuffer.java | 82 +++++- .../execution/buffer/OutputBuffer.java | 27 ++ .../scheduler/SqlQueryScheduler.java | 3 +- .../operator/ExchangeClientFactory.java | 1 + .../operator/TaskOutputOperator.java | 23 ++ .../operator/output/PagePartitioner.java | 19 ++ .../testing/TestingRecoveryUtils.java | 6 + .../buffer/TestHybridSpoolingBuffer.java | 249 ++++++++++++++++++ .../TestSpoolingExchangeOutputBuffer.java | 36 ++- .../spi/filesystem/HetuFileSystemClient.java | 3 + 20 files changed, 753 insertions(+), 19 deletions(-) create mode 100644 presto-main/src/main/java/io/prestosql/execution/buffer/HybridSpoolingBuffer.java create mode 100644 presto-main/src/test/java/io/prestosql/execution/buffer/TestHybridSpoolingBuffer.java diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuHdfsFileSystemClient.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuHdfsFileSystemClient.java index 61bacc659..16c634a87 100644 --- a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuHdfsFileSystemClient.java +++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuHdfsFileSystemClient.java @@ -18,6 +18,7 @@ import com.google.common.base.Throwables; import io.airlift.log.Logger; import io.prestosql.spi.filesystem.SupportedFileAttributes; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; @@ -267,6 +268,15 @@ public class HetuHdfsFileSystemClient } } + @Override + public void flush(OutputStream outputStream) + throws IOException + { + if (outputStream instanceof FSDataOutputStream) { + ((FSDataOutputStream) outputStream).hflush(); + } + } + @Override public void close() throws IOException diff --git a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuLocalFileSystemClient.java b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuLocalFileSystemClient.java index f9e0c9eb8..c4a52c158 100644 --- a/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuLocalFileSystemClient.java +++ b/hetu-filesystem-client/src/main/java/io/hetu/core/filesystem/HetuLocalFileSystemClient.java @@ -249,4 +249,11 @@ public class HetuLocalFileSystemClient String glob = prefix + "*" + suffix; return StreamSupport.stream(newDirectoryStream(path, glob).spliterator(), false); } + + @Override + public void flush(OutputStream outputStream) + throws IOException + { + outputStream.flush(); + } } diff --git a/presto-main/src/main/java/io/prestosql/exchange/ExchangeSink.java b/presto-main/src/main/java/io/prestosql/exchange/ExchangeSink.java index eb174df1a..0386dcd41 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/ExchangeSink.java +++ b/presto-main/src/main/java/io/prestosql/exchange/ExchangeSink.java @@ -16,8 +16,13 @@ package io.prestosql.exchange; import io.airlift.slice.Slice; import io.hetu.core.transport.execution.buffer.PagesSerde; import io.prestosql.exchange.FileSystemExchangeConfig.DirectSerialisationType; +import io.prestosql.exchange.storage.FileSystemExchangeStorage; import io.prestosql.spi.Page; +import javax.crypto.SecretKey; + +import java.net.URI; +import java.util.Optional; import java.util.concurrent.CompletableFuture; public interface ExchangeSink @@ -40,4 +45,14 @@ public interface ExchangeSink { return DirectSerialisationType.OFF; } + + FileSystemExchangeStorage getExchangeStorage(); + + URI getOutputDirectory(); + + Optional getSecretKey(); + + boolean isExchangeCompressionEnabled(); + + int getPartitionId(); } diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java index c7fa8b459..ec4424b4b 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeManager.java @@ -137,7 +137,8 @@ public class FileSystemExchangeManager exchangeSinkBuffersPerPartition, exchangeSinkMaxFileSizeInBytes, directSerialisationType, - directSerialisationBufferSize); + directSerialisationBufferSize, + instanceHandle.getPartiitonId()); } @Override @@ -157,7 +158,8 @@ public class FileSystemExchangeManager exchangeSinkBuffersPerPartition, exchangeSinkMaxFileSizeInBytes, serType, - directSerialisationBufferSize); + directSerialisationBufferSize, + instanceHandle.getPartiitonId()); } @Override diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java index d985a4d46..b2f8e77f8 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSink.java @@ -91,6 +91,7 @@ public class FileSystemExchangeSink private boolean closed; private final DirectSerialisationType directSerialisationType; private final int directSerialisationBufferSize; + private final int partitionId; public FileSystemExchangeSink( FileSystemExchangeStorage exchangeStorage, @@ -105,7 +106,8 @@ public class FileSystemExchangeSink int exchangeSinkBuffersPerPartition, long maxFileSizeInBytes, DirectSerialisationType directSerialisationType, - int directSerialisationBufferSize) + int directSerialisationBufferSize, + int partitionId) { checkArgument(maxPageStorageSizeInBytes <= maxFileSizeInBytes, format("maxPageStorageSizeInBytes %s exceeded maxFileSizeInBytes %s", succinctBytes(maxPageStorageSizeInBytes), succinctBytes(maxFileSizeInBytes))); @@ -129,6 +131,7 @@ public class FileSystemExchangeSink else { this.bufferPool = null; } + this.partitionId = partitionId; } @Override @@ -142,6 +145,36 @@ public class FileSystemExchangeSink return directSerialisationType; } + @Override + public URI getOutputDirectory() + { + return outputDirectory; + } + + @Override + public Optional getSecretKey() + { + return secretKey; + } + + @Override + public boolean isExchangeCompressionEnabled() + { + return exchangeCompressionEnabled; + } + + @Override + public FileSystemExchangeStorage getExchangeStorage() + { + return exchangeStorage; + } + + @Override + public int getPartitionId() + { + return partitionId; + } + @Override public void add(int partitionId, Slice data) { @@ -428,7 +461,7 @@ public class FileSystemExchangeSink currentBuffer.writeBytes(slice.getBytes(position, writableBytes)); position += writableBytes; - flushIfNeeded(false); + flushIfNeeded(true); } } diff --git a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSinkInstanceHandle.java b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSinkInstanceHandle.java index 9e86443ac..74c11ae6b 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSinkInstanceHandle.java +++ b/presto-main/src/main/java/io/prestosql/exchange/FileSystemExchangeSinkInstanceHandle.java @@ -55,4 +55,10 @@ public class FileSystemExchangeSinkInstanceHandle { return outputPartitionCount; } + + @JsonProperty + public int getPartiitonId() + { + return sinkHandle.getPartitionId(); + } } diff --git a/presto-main/src/main/java/io/prestosql/exchange/RetryPolicy.java b/presto-main/src/main/java/io/prestosql/exchange/RetryPolicy.java index d03e98e94..459598ee7 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/RetryPolicy.java +++ b/presto-main/src/main/java/io/prestosql/exchange/RetryPolicy.java @@ -19,6 +19,7 @@ public enum RetryPolicy { TASK(RetryMode.RETRIES_ENABLED), NONE(RetryMode.NO_RETRIES), + TASK_ASYNC(RetryMode.RETRIES_ENABLED), /**/; private final RetryMode retryMode; diff --git a/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeWriter.java b/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeWriter.java index 79e8b0902..49bbfc769 100644 --- a/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeWriter.java +++ b/presto-main/src/main/java/io/prestosql/exchange/storage/HetuFileSystemExchangeWriter.java @@ -55,35 +55,39 @@ public class HetuFileSystemExchangeWriter private final OutputStream outputStream; private final DirectSerialisationType directSerialisationType; private final int directSerialisationBufferSize; + private final HetuFileSystemClient fileSystemClient; + private final OutputStream delegateOutputStream; public HetuFileSystemExchangeWriter(URI file, HetuFileSystemClient fileSystemClient, Optional secretKey, boolean exchangeCompressionEnabled, AlgorithmParameterSpec algorithmParameterSpec, FileSystemExchangeConfig.DirectSerialisationType directSerialisationType, int directSerialisationBufferSize) { this.directSerialisationBufferSize = directSerialisationBufferSize; this.directSerialisationType = directSerialisationType; + this.fileSystemClient = fileSystemClient; try { Path path = Paths.get(file.toString()); + this.delegateOutputStream = fileSystemClient.newOutputStream(path); if (secretKey.isPresent() && exchangeCompressionEnabled) { Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION); cipher.init(Cipher.ENCRYPT_MODE, secretKey.get(), algorithmParameterSpec); - this.outputStream = new SnappyFramedOutputStream(new CipherOutputStream(fileSystemClient.newOutputStream(path), cipher)); + this.outputStream = new SnappyFramedOutputStream(new CipherOutputStream(delegateOutputStream, cipher)); } else if (secretKey.isPresent()) { Cipher cipher = Cipher.getInstance(CIPHER_TRANSFORMATION); cipher.init(Cipher.ENCRYPT_MODE, secretKey.get(), algorithmParameterSpec); - this.outputStream = new CipherOutputStream(fileSystemClient.newOutputStream(path), cipher); + this.outputStream = new CipherOutputStream(delegateOutputStream, cipher); } else if (exchangeCompressionEnabled) { - this.outputStream = new SnappyFramedOutputStream(new OutputStreamSliceOutput(fileSystemClient.newOutputStream(path), directSerialisationBufferSize)); + this.outputStream = new SnappyFramedOutputStream(new OutputStreamSliceOutput(delegateOutputStream, directSerialisationBufferSize)); } else { if (directSerialisationType == DirectSerialisationType.KRYO) { - this.outputStream = new Output(fileSystemClient.newOutputStream(path), directSerialisationBufferSize); + this.outputStream = new Output(delegateOutputStream, directSerialisationBufferSize); } else if (directSerialisationType == DirectSerialisationType.JAVA) { - this.outputStream = new OutputStreamSliceOutput(fileSystemClient.newOutputStream(path), directSerialisationBufferSize); + this.outputStream = new OutputStreamSliceOutput(delegateOutputStream, directSerialisationBufferSize); } else { - this.outputStream = new OutputStreamSliceOutput(fileSystemClient.newOutputStream(path), directSerialisationBufferSize); + this.outputStream = new OutputStreamSliceOutput(delegateOutputStream, directSerialisationBufferSize); } } } @@ -98,6 +102,8 @@ public class HetuFileSystemExchangeWriter { try { outputStream.write(slice.getBytes()); + outputStream.flush(); + fileSystemClient.flush(delegateOutputStream); } catch (IOException | RuntimeException e) { return immediateFailedFuture(e); @@ -110,6 +116,13 @@ public class HetuFileSystemExchangeWriter { checkState(directSerialisationType != DirectSerialisationType.OFF, "Should be used with direct serialization is enabled!"); serde.serialize(outputStream, page); + try { + outputStream.flush(); + fileSystemClient.flush(delegateOutputStream); + } + catch (IOException | RuntimeException e) { + return immediateFailedFuture(e); + } return immediateFuture(null); } diff --git a/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java b/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java index ad847703e..b3a873dd7 100644 --- a/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java +++ b/presto-main/src/main/java/io/prestosql/execution/SqlStageExecution.java @@ -550,7 +550,9 @@ public final class SqlStageExecution if (this.outputBuffers.compareAndSet(currentOutputBuffers, outputBuffers)) { for (RemoteTask task : getAllTasks()) { - task.setOutputBuffers(outputBuffers); + OutputBuffers localOutputBuffers = new OutputBuffers(outputBuffers.getType(), outputBuffers.getVersion(), + outputBuffers.isNoMoreBufferIds(), outputBuffers.getBuffers(), outputBuffers.getExchangeSinkInstanceHandle()); + task.setOutputBuffers(localOutputBuffers); } return; } @@ -658,6 +660,8 @@ public final class SqlStageExecution }); OutputBuffers localOutputBuffers = this.outputBuffers.get(); + localOutputBuffers = new OutputBuffers(localOutputBuffers.getType(), localOutputBuffers.getVersion(), + localOutputBuffers.isNoMoreBufferIds(), localOutputBuffers.getBuffers(), localOutputBuffers.getExchangeSinkInstanceHandle()); checkState(localOutputBuffers != null, "Initial output buffers must be set before a task can be scheduled"); if (sinkExchange.isPresent()) { diff --git a/presto-main/src/main/java/io/prestosql/execution/buffer/HybridSpoolingBuffer.java b/presto-main/src/main/java/io/prestosql/execution/buffer/HybridSpoolingBuffer.java new file mode 100644 index 000000000..98beb1bc2 --- /dev/null +++ b/presto-main/src/main/java/io/prestosql/execution/buffer/HybridSpoolingBuffer.java @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.execution.buffer; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.log.Logger; +import io.airlift.slice.Slice; +import io.airlift.units.DataSize; +import io.hetu.core.transport.execution.buffer.PagesSerde; +import io.hetu.core.transport.execution.buffer.PagesSerdeUtil; +import io.hetu.core.transport.execution.buffer.SerializedPage; +import io.prestosql.exchange.ExchangeManager; +import io.prestosql.exchange.ExchangeSink; +import io.prestosql.exchange.ExchangeSource; +import io.prestosql.exchange.ExchangeSourceHandle; +import io.prestosql.exchange.FileStatus; +import io.prestosql.exchange.FileSystemExchangeConfig; +import io.prestosql.exchange.FileSystemExchangeSourceHandle; +import io.prestosql.exchange.storage.FileSystemExchangeStorage; +import io.prestosql.memory.context.LocalMemoryContext; +import io.prestosql.spi.Page; + +import javax.crypto.SecretKey; + +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.function.Supplier; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.util.concurrent.Futures.immediateFuture; +import static io.airlift.concurrent.Threads.daemonThreadsNamed; +import static io.prestosql.exchange.FileSystemExchangeSink.DATA_FILE_SUFFIX; +import static java.util.concurrent.Executors.newCachedThreadPool; + +public class HybridSpoolingBuffer + extends SpoolingExchangeOutputBuffer +{ + private static final String PARENT_URI = ".."; + private static final Logger LOG = Logger.get(HybridSpoolingBuffer.class); + + private final OutputBufferStateMachine stateMachine; + private final OutputBuffers outputBuffers; + private final ExchangeSink exchangeSink; + private ExchangeSource exchangeSource; + private final Supplier memoryContextSupplier; + private final ExecutorService executor; + private final ExchangeManager exchangeManager; + private int token; + private PagesSerde serde; + private PagesSerde javaSerde; + private PagesSerde kryoSerde; + + private URI outputDirectory; + + public HybridSpoolingBuffer(OutputBufferStateMachine stateMachine, OutputBuffers outputBuffers, ExchangeSink exchangeSink, Supplier memoryContextSupplier, ExchangeManager exchangeManager) + { + super(stateMachine, outputBuffers, exchangeSink, memoryContextSupplier); + this.stateMachine = stateMachine; + this.outputBuffers = outputBuffers; + this.exchangeSink = exchangeSink; + this.memoryContextSupplier = memoryContextSupplier; + this.outputDirectory = exchangeSink.getOutputDirectory().resolve(PARENT_URI); + this.executor = newCachedThreadPool(daemonThreadsNamed("exchange-source-handles-creation-%s")); + this.exchangeManager = exchangeManager; + } + + @Override + public ListenableFuture get(OutputBuffers.OutputBufferId bufferId, long token, DataSize maxSize) + { + if (exchangeSource == null) { + exchangeSource = createExchangeSource(exchangeSink.getPartitionId()); + if (exchangeSource == null) { + return immediateFuture(BufferResult.emptyResults(token, false)); + } + } + return immediateFuture(readPages(token)); + } + + private BufferResult readPages(long tokenId) + { + List result = new ArrayList<>(); + FileSystemExchangeConfig.DirectSerialisationType directSerialisationType = exchangeSource.getDirectSerialisationType(); + if (directSerialisationType != FileSystemExchangeConfig.DirectSerialisationType.OFF) { + PagesSerde pagesSerde = (directSerialisationType == FileSystemExchangeConfig.DirectSerialisationType.JAVA) ? javaSerde : kryoSerde; + while (token < tokenId) { + exchangeSource.readPage(pagesSerde); + token++; + } + Page page = exchangeSource.readPage(pagesSerde); + result.add(serde.serialize(page)); + } + else { + while (token < tokenId) { + exchangeSource.read(); + token++; + } + Slice slice = exchangeSource.read(); + SerializedPage serializedPage = slice != null ? PagesSerdeUtil.readSerializedPage(slice) : null; + result.add(serializedPage); + } + return new BufferResult(tokenId, tokenId + result.size(), false, result); + } + + public ExchangeSource createExchangeSource(int partitionId) + { + ExchangeHandleInfo exchangeHandleInfo = getExchangeHandleInfo(exchangeSink); + ListenableFuture> fileStatus = getFileStatus(outputDirectory); + List completedFileStatus = new ArrayList<>(); + if (!fileStatus.isDone()) { + return null; + } + try { + completedFileStatus = fileStatus.get(); + } + catch (Exception e) { + LOG.debug("Failed in creating exchange source with outputDirectory" + outputDirectory); + } + List handles = ImmutableList.of(new FileSystemExchangeSourceHandle(partitionId, + completedFileStatus, exchangeHandleInfo.getSecretKey(), exchangeHandleInfo.isExchangeCompressionEnabled())); + + return exchangeManager.createSource(handles); + } + + private ListenableFuture> getFileStatus(URI path) + { + FileSystemExchangeStorage exchangeStorage = exchangeSink.getExchangeStorage(); + return Futures.transform(exchangeStorage.listFilesRecursively(path), + sinkOutputFiles -> sinkOutputFiles.stream().filter(file -> file.getFilePath().endsWith(DATA_FILE_SUFFIX)).collect(toImmutableList()), + executor); + } + + private ExchangeHandleInfo getExchangeHandleInfo(ExchangeSink exchangeSink) + { + return new ExchangeHandleInfo(exchangeSink.getOutputDirectory(), + exchangeSink.getExchangeStorage(), + exchangeSink.getSecretKey().map(SecretKey::getEncoded), + exchangeSink.isExchangeCompressionEnabled()); + } + + @Override + public void setJavaSerde(PagesSerde javaSerde) + { + this.javaSerde = javaSerde; + } + + @Override + public void setKryoSerde(PagesSerde kryoSerde) + { + this.kryoSerde = kryoSerde; + } + + @Override + public void setSerde(PagesSerde serde) + { + this.serde = serde; + } + + private static class ExchangeHandleInfo + { + URI outputDirectory; + FileSystemExchangeStorage exchangeStorage; + Optional secretKey; + boolean exchangeCompressionEnabled; + + ExchangeHandleInfo(URI outputDirectory, FileSystemExchangeStorage exchangeStorage, Optional secretKey, boolean exchangeCompressionEnabled) + { + this.outputDirectory = outputDirectory; + this.exchangeStorage = exchangeStorage; + this.secretKey = secretKey; + this.exchangeCompressionEnabled = exchangeCompressionEnabled; + } + + public URI getOutputDirectory() + { + return outputDirectory; + } + + public Optional getSecretKey() + { + return secretKey; + } + + public boolean isExchangeCompressionEnabled() + { + return exchangeCompressionEnabled; + } + + public FileSystemExchangeStorage getExchangeStorage() + { + return exchangeStorage; + } + } +} diff --git a/presto-main/src/main/java/io/prestosql/execution/buffer/LazyOutputBuffer.java b/presto-main/src/main/java/io/prestosql/execution/buffer/LazyOutputBuffer.java index 1e3e05712..86e7caad0 100644 --- a/presto-main/src/main/java/io/prestosql/execution/buffer/LazyOutputBuffer.java +++ b/presto-main/src/main/java/io/prestosql/execution/buffer/LazyOutputBuffer.java @@ -64,6 +64,9 @@ public class LazyOutputBuffer @GuardedBy("this") private OutputBuffer delegate; + @GuardedBy("this") + private OutputBuffer hybridSpoolingDelegate; + @GuardedBy("this") private final Set abortedBuffers = new HashSet<>(); @@ -72,6 +75,10 @@ public class LazyOutputBuffer private final ExchangeManagerRegistry exchangeManagerRegistry; private Optional exchangeSink; + private PagesSerde serde; + private PagesSerde javaSerde; + private PagesSerde kryoSerde; + public LazyOutputBuffer( TaskId taskId, Executor executor, @@ -161,6 +168,7 @@ public class LazyOutputBuffer Set abortedBuffersIds = ImmutableSet.of(); List bufferPendingReads = ImmutableList.of(); OutputBuffer outputBuffer; + ExchangeManager exchangeManager = null; synchronized (this) { if (delegate == null) { // ignore set output if buffer was already destroyed or failed @@ -185,7 +193,7 @@ public class LazyOutputBuffer if (newOutputBuffers.getExchangeSinkInstanceHandle().isPresent()) { ExchangeSinkInstanceHandle exchangeSinkInstanceHandle = newOutputBuffers.getExchangeSinkInstanceHandle() .orElseThrow(() -> new IllegalArgumentException("exchange sink handle is expected to be present for buffer type EXTERNAL")); - ExchangeManager exchangeManager = exchangeManagerRegistry.getExchangeManager(); + exchangeManager = exchangeManagerRegistry.getExchangeManager(); ExchangeSink exchangeSinkInstance = exchangeManager.createSink(exchangeSinkInstanceHandle, false); //TODO: create directories this.exchangeSink = Optional.ofNullable(exchangeSinkInstance); } @@ -199,11 +207,30 @@ public class LazyOutputBuffer bufferPendingReads = ImmutableList.copyOf(this.pendingReads); this.pendingReads.clear(); } - this.exchangeSink.ifPresent(sink -> delegate = new SpoolingExchangeOutputBuffer( - stateMachine, - newOutputBuffers, - sink, - systemMemoryContextSupplier)); + ExchangeManager finalExchangeManager = exchangeManager; + this.exchangeSink.ifPresent(sink -> { + if (delegate == null) { + delegate = new SpoolingExchangeOutputBuffer( + stateMachine, + newOutputBuffers, + sink, + systemMemoryContextSupplier); + } + else { + if (hybridSpoolingDelegate == null) { + hybridSpoolingDelegate = new HybridSpoolingBuffer(stateMachine, + newOutputBuffers, + sink, + systemMemoryContextSupplier, + finalExchangeManager); + if (hybridSpoolingDelegate != null) { + hybridSpoolingDelegate.setSerde(serde); + hybridSpoolingDelegate.setJavaSerde(javaSerde); + hybridSpoolingDelegate.setKryoSerde(kryoSerde); + } + } + } + }); outputBuffer = delegate; } @@ -325,6 +352,9 @@ public class LazyOutputBuffer checkState(delegate != null, "Buffer has not been initialized"); outputBuffer = delegate; } + if (hybridSpoolingDelegate != null) { + hybridSpoolingDelegate.setNoMorePages(); + } outputBuffer.setNoMorePages(); } @@ -488,4 +518,44 @@ public class LazyOutputBuffer } outputBuffer.enqueuePages(partition, pages, id, directSerde); } + + @Override + public boolean isSpoolingDelegateAvailable() + { + return delegate != null && hybridSpoolingDelegate != null && hybridSpoolingDelegate.isSpoolingOutputBuffer(); + } + + @Override + public OutputBuffer getSpoolingDelegate() + { + return hybridSpoolingDelegate; + } + + @Override + public DirectSerialisationType getDelegateSpoolingExchangeDirectSerializationType() + { + DirectSerialisationType type = DirectSerialisationType.JAVA; + if (hybridSpoolingDelegate != null) { + type = hybridSpoolingDelegate.getExchangeDirectSerialisationType(); + } + return type; + } + + @Override + public void setSerde(PagesSerde serde) + { + this.serde = serde; + } + + @Override + public void setJavaSerde(PagesSerde javaSerde) + { + this.javaSerde = javaSerde; + } + + @Override + public void setKryoSerde(PagesSerde kryoSerde) + { + this.kryoSerde = kryoSerde; + } } diff --git a/presto-main/src/main/java/io/prestosql/execution/buffer/OutputBuffer.java b/presto-main/src/main/java/io/prestosql/execution/buffer/OutputBuffer.java index 3e6cfbeb1..773b27056 100644 --- a/presto-main/src/main/java/io/prestosql/execution/buffer/OutputBuffer.java +++ b/presto-main/src/main/java/io/prestosql/execution/buffer/OutputBuffer.java @@ -176,4 +176,31 @@ public interface OutputBuffer { return; } + + default boolean isSpoolingDelegateAvailable() + { + return false; + } + + default OutputBuffer getSpoolingDelegate() + { + return null; + } + + default DirectSerialisationType getDelegateSpoolingExchangeDirectSerializationType() + { + return DirectSerialisationType.JAVA; + } + + default void setSerde(PagesSerde pagesSerde) + { + } + + default void setJavaSerde(PagesSerde pagesSerde) + { + } + + default void setKryoSerde(PagesSerde pagesSerde) + { + } } diff --git a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java index 621438ed9..9fa71a151 100644 --- a/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java +++ b/presto-main/src/main/java/io/prestosql/execution/scheduler/SqlQueryScheduler.java @@ -149,6 +149,7 @@ import static io.prestosql.SystemSessionProperties.getWriterMinSize; import static io.prestosql.SystemSessionProperties.isQueryResourceTrackingEnabled; import static io.prestosql.SystemSessionProperties.isReuseTableScanEnabled; import static io.prestosql.exchange.RetryPolicy.TASK; +import static io.prestosql.exchange.RetryPolicy.TASK_ASYNC; import static io.prestosql.execution.BasicStageStats.aggregateBasicStageStats; import static io.prestosql.execution.QueryState.FINISHING; import static io.prestosql.execution.QueryState.RECOVERING; @@ -633,7 +634,7 @@ public class SqlQueryScheduler StageId stageId = new StageId(queryStateMachine.getQueryId(), nextStageId.getAndIncrement()); Optional exchange = Optional.empty(); - if (retryPolicy.equals(TASK)) { + if (retryPolicy.equals(TASK) || retryPolicy.equals(TASK_ASYNC)) { ExchangeManager exchangeManager = exchangeManagerRegistry.getExchangeManager(); exchange = createSqlStageExchange(exchangeManager, stageId); } diff --git a/presto-main/src/main/java/io/prestosql/operator/ExchangeClientFactory.java b/presto-main/src/main/java/io/prestosql/operator/ExchangeClientFactory.java index a41957238..ee89af2b0 100644 --- a/presto-main/src/main/java/io/prestosql/operator/ExchangeClientFactory.java +++ b/presto-main/src/main/java/io/prestosql/operator/ExchangeClientFactory.java @@ -138,6 +138,7 @@ public class ExchangeClientFactory buffer = new DeduplicatingDirectExchangeBuffer(scheduler, deduplicationBufferSize, retryPolicy, exchangeManagerRegistry, queryId, exchangeId); break; case NONE: + case TASK_ASYNC: buffer = null; break; default: diff --git a/presto-main/src/main/java/io/prestosql/operator/TaskOutputOperator.java b/presto-main/src/main/java/io/prestosql/operator/TaskOutputOperator.java index b4410f694..737ee270f 100644 --- a/presto-main/src/main/java/io/prestosql/operator/TaskOutputOperator.java +++ b/presto-main/src/main/java/io/prestosql/operator/TaskOutputOperator.java @@ -123,6 +123,8 @@ public class TaskOutputOperator private final SingleInputSnapshotState snapshotState; private final boolean isStage0; private final PagesSerde serde; + private final PagesSerde javaSerde; + private final PagesSerde kryoSerde; private boolean finished; public TaskOutputOperator(String id, OperatorContext operatorContext, OutputBuffer outputBuffer, Function pagePreprocessor) @@ -132,6 +134,11 @@ public class TaskOutputOperator this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); this.pagePreprocessor = requireNonNull(pagePreprocessor, "pagePreprocessor is null"); this.serde = requireNonNull(operatorContext.getDriverContext().getSerde(), "serde is null"); + this.javaSerde = requireNonNull(operatorContext.getDriverContext().getJavaSerde(), "javaSerde is null"); + this.kryoSerde = requireNonNull(operatorContext.getDriverContext().getKryoSerde(), "kryoSerde is null"); + this.outputBuffer.setSerde(serde); + this.outputBuffer.setJavaSerde(javaSerde); + this.outputBuffer.setKryoSerde(kryoSerde); this.snapshotState = operatorContext.isSnapshotEnabled() ? SingleInputSnapshotState.forOperator(this, operatorContext) : null; this.isStage0 = operatorContext.getDriverContext().getPipelineContext().getTaskContext().getTaskId().getStageId().getId() == 0; } @@ -192,6 +199,7 @@ public class TaskOutputOperator } DirectSerialisationType serialisationType = outputBuffer.getExchangeDirectSerialisationType(); + DirectSerialisationType spoolingSerialisationType = outputBuffer.getDelegateSpoolingExchangeDirectSerializationType(); if (outputBuffer.isSpoolingOutputBuffer() && serialisationType != DirectSerialisationType.OFF) { PagesSerde directSerde = (serialisationType == DirectSerialisationType.JAVA) ? operatorContext.getDriverContext().getJavaSerde() : operatorContext.getDriverContext().getKryoSerde(); List pages = splitPage(inputPage, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); @@ -215,6 +223,21 @@ public class TaskOutputOperator } } else { + if (outputBuffer.isSpoolingDelegateAvailable()) { + OutputBuffer spoolingBuffer = outputBuffer.getSpoolingDelegate(); + if (spoolingSerialisationType != DirectSerialisationType.OFF) { + PagesSerde directSerde = (spoolingSerialisationType == DirectSerialisationType.JAVA) ? operatorContext.getDriverContext().getJavaSerde() : operatorContext.getDriverContext().getKryoSerde(); + List pages = splitPage(inputPage, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + if (spoolingBuffer != null) { + spoolingBuffer.enqueuePages(0, pages, id, directSerde); + } + } + else { + if (spoolingBuffer != null) { + spoolingBuffer.enqueue(serializedPages, id); + } + } + } outputBuffer.enqueue(serializedPages, id); } } diff --git a/presto-main/src/main/java/io/prestosql/operator/output/PagePartitioner.java b/presto-main/src/main/java/io/prestosql/operator/output/PagePartitioner.java index bbc68e1c8..2debb2841 100644 --- a/presto-main/src/main/java/io/prestosql/operator/output/PagePartitioner.java +++ b/presto-main/src/main/java/io/prestosql/operator/output/PagePartitioner.java @@ -99,6 +99,9 @@ public class PagePartitioner this.outputBuffer = requireNonNull(outputBuffer, "outputBuffer is null"); this.sourceTypes = requireNonNull(sourceTypes, "sourceTypes is null").toArray(new Type[0]); this.operatorContext = requireNonNull(operatorContext, "serde is null"); + this.outputBuffer.setSerde(requireNonNull(operatorContext.getDriverContext().getSerde(), "serde is null")); + this.outputBuffer.setJavaSerde(requireNonNull(operatorContext.getDriverContext().getJavaSerde(), "java serde is null")); + this.outputBuffer.setKryoSerde(requireNonNull(operatorContext.getDriverContext().getKryoSerde(), "kryo serde is null")); int partitionCount = partitionFunction.getPartitionCount(); int pageSize = min(DEFAULT_MAX_PAGE_SIZE_IN_BYTES, ((int) maxMemory.toBytes()) / partitionCount); @@ -456,6 +459,7 @@ public class PagePartitioner partitionPageBuilder.reset(); FileSystemExchangeConfig.DirectSerialisationType serialisationType = outputBuffer.getExchangeDirectSerialisationType(); + FileSystemExchangeConfig.DirectSerialisationType spoolingSerialisationType = outputBuffer.getDelegateSpoolingExchangeDirectSerializationType(); if (outputBuffer.isSpoolingOutputBuffer() && serialisationType != FileSystemExchangeConfig.DirectSerialisationType.OFF) { PagesSerde directSerde = (serialisationType == FileSystemExchangeConfig.DirectSerialisationType.JAVA) ? operatorContext.getDriverContext().getJavaSerde() : operatorContext.getDriverContext().getKryoSerde(); List pages = splitPage(pagePartition, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); @@ -466,6 +470,21 @@ public class PagePartitioner .map(page -> operatorContext.getDriverContext().getSerde().serialize(page)) .collect(toImmutableList()); + if (outputBuffer.isSpoolingDelegateAvailable()) { + OutputBuffer spoolingBuffer = outputBuffer.getSpoolingDelegate(); + if (spoolingSerialisationType != FileSystemExchangeConfig.DirectSerialisationType.OFF) { + PagesSerde directSerde = (spoolingSerialisationType == FileSystemExchangeConfig.DirectSerialisationType.JAVA) ? operatorContext.getDriverContext().getJavaSerde() : operatorContext.getDriverContext().getKryoSerde(); + List pages = splitPage(pagePartition, DEFAULT_MAX_PAGE_SIZE_IN_BYTES); + if (spoolingBuffer != null) { + spoolingBuffer.enqueuePages(partition, pages, id, directSerde); + } + } + else { + if (spoolingBuffer != null) { + spoolingBuffer.enqueue(partition, serializedPages, id); + } + } + } outputBuffer.enqueue(partition, serializedPages, id); } pagesAdded.incrementAndGet(); diff --git a/presto-main/src/main/java/io/prestosql/testing/TestingRecoveryUtils.java b/presto-main/src/main/java/io/prestosql/testing/TestingRecoveryUtils.java index c97655402..8203dcd53 100644 --- a/presto-main/src/main/java/io/prestosql/testing/TestingRecoveryUtils.java +++ b/presto-main/src/main/java/io/prestosql/testing/TestingRecoveryUtils.java @@ -95,6 +95,12 @@ public class TestingRecoveryUtils { } + @Override + public void flush(OutputStream outputStream) + throws IOException + { + } + @Override public InputStream newInputStream(Path path) { diff --git a/presto-main/src/test/java/io/prestosql/execution/buffer/TestHybridSpoolingBuffer.java b/presto-main/src/test/java/io/prestosql/execution/buffer/TestHybridSpoolingBuffer.java new file mode 100644 index 000000000..51451966b --- /dev/null +++ b/presto-main/src/test/java/io/prestosql/execution/buffer/TestHybridSpoolingBuffer.java @@ -0,0 +1,249 @@ +/* + * Copyright (C) 2018-2022. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.prestosql.execution.buffer; + +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.units.DataSize; +import io.hetu.core.filesystem.HetuLocalFileSystemClient; +import io.hetu.core.filesystem.LocalConfig; +import io.hetu.core.transport.execution.buffer.PagesSerde; +import io.hetu.core.transport.execution.buffer.PagesSerdeFactory; +import io.hetu.core.transport.execution.buffer.SerializedPage; +import io.prestosql.exchange.ExchangeManager; +import io.prestosql.exchange.ExchangeSink; +import io.prestosql.exchange.ExchangeSinkInstanceHandle; +import io.prestosql.exchange.FileSystemExchangeConfig; +import io.prestosql.exchange.FileSystemExchangeManager; +import io.prestosql.exchange.FileSystemExchangeSinkHandle; +import io.prestosql.exchange.FileSystemExchangeSinkInstanceHandle; +import io.prestosql.exchange.FileSystemExchangeStats; +import io.prestosql.exchange.storage.FileSystemExchangeStorage; +import io.prestosql.exchange.storage.HetuFileSystemExchangeStorage; +import io.prestosql.execution.StageId; +import io.prestosql.execution.TaskId; +import io.prestosql.operator.PageAssertions; +import io.prestosql.spi.Page; +import io.prestosql.spi.QueryId; +import io.prestosql.spi.block.Block; +import io.prestosql.spi.block.BlockBuilder; +import io.prestosql.testing.TestingPagesSerdeFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.stream.Collectors; + +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.prestosql.metadata.MetadataManager.createTestMetadataManager; +import static io.prestosql.spi.type.IntegerType.INTEGER; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestHybridSpoolingBuffer +{ + private final PagesSerde serde = new TestingPagesSerdeFactory().createPagesSerde(); + private final PagesSerde javaSerde = new PagesSerdeFactory(createTestMetadataManager().getFunctionAndTypeManager().getBlockEncodingSerde(), false) + .createDirectPagesSerde(Optional.empty(), true, false); + private final PagesSerde kryoSerde = new PagesSerdeFactory(createTestMetadataManager().getFunctionAndTypeManager().getBlockKryoEncodingSerde(), false) + .createDirectPagesSerde(Optional.empty(), true, true); + private ExchangeSink exchangeSink; + private ExchangeManager exchangeManager; + private ExchangeSinkInstanceHandle exchangeSinkInstanceHandle; + private final String baseDir = "/tmp/hetu/spooling"; + private final String accessDir = "/tmp/hetu"; + private final Path accessPath = Paths.get(accessDir); + + @BeforeMethod + public void setUp() + throws IOException, InterruptedException + { + Path basePath = Paths.get(baseDir); + File base = new File(baseDir); + if (!base.exists()) { + Files.createDirectories(basePath); + } + else { + deleteDirectory(base); + Files.createDirectories(basePath); + } + } + + @AfterMethod + public void cleanUp() + { + File base = new File(baseDir); + if (base.exists()) { + deleteDirectory(base); + } + } + + private void setConfig(FileSystemExchangeConfig.DirectSerialisationType type) + { + FileSystemExchangeConfig config = new FileSystemExchangeConfig() + .setExchangeEncryptionEnabled(false) + .setDirectSerializationType(type) + .setBaseDirectories(baseDir); + + FileSystemExchangeStorage exchangeStorage = new HetuFileSystemExchangeStorage(); + exchangeStorage.setFileSystemClient(new HetuLocalFileSystemClient(new LocalConfig(new Properties()), accessPath)); + exchangeManager = new FileSystemExchangeManager(exchangeStorage, new FileSystemExchangeStats(), config); + exchangeSinkInstanceHandle = new FileSystemExchangeSinkInstanceHandle( + new FileSystemExchangeSinkHandle(0, Optional.empty(), false), + config.getBaseDirectories().get(0), + 10); + exchangeSink = exchangeManager.createSink(exchangeSinkInstanceHandle, false); + } + + @Test + public void testHybridSpoolingBufferWithSerializationOff() + throws ExecutionException, InterruptedException + { + setConfig(FileSystemExchangeConfig.DirectSerialisationType.OFF); + HybridSpoolingBuffer hybridSpoolingBuffer = createHybridSpoolingBuffer(); + List pages = new ArrayList<>(); + pages.add(generateSerializedPage()); + pages.add(generateSerializedPage()); + hybridSpoolingBuffer.enqueue(0, pages, null); + ListenableFuture result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + while (result.get().equals(BufferResult.emptyResults(0, false))) { + result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + } + List actualPages = result.get().getSerializedPages().stream().map(page -> serde.deserialize(page)).collect(Collectors.toList()); + assertEquals(actualPages.size(), 1); + for (int pageCount = 0; pageCount < actualPages.size(); pageCount++) { + PageAssertions.assertPageEquals(ImmutableList.of(INTEGER, INTEGER), actualPages.get(pageCount), generatePage()); + } + result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + actualPages = result.get().getSerializedPages().stream().map(page -> serde.deserialize(page)).collect(Collectors.toList()); + assertEquals(actualPages.size(), 1); + for (int pageCount = 0; pageCount < actualPages.size(); pageCount++) { + PageAssertions.assertPageEquals(ImmutableList.of(INTEGER, INTEGER), actualPages.get(pageCount), generatePage()); + } + hybridSpoolingBuffer.setNoMorePages(); + } + + @Test + public void testHybridSpoolingBufferWithSerializationJava() + throws ExecutionException, InterruptedException + { + setConfig(FileSystemExchangeConfig.DirectSerialisationType.JAVA); + HybridSpoolingBuffer hybridSpoolingBuffer = createHybridSpoolingBuffer(); + hybridSpoolingBuffer.setSerde(serde); + hybridSpoolingBuffer.setJavaSerde(javaSerde); + hybridSpoolingBuffer.setKryoSerde(kryoSerde); + List pages = new ArrayList<>(); + pages.add(generatePage()); + pages.add(generatePage()); + hybridSpoolingBuffer.enqueuePages(0, pages, null, javaSerde); + ListenableFuture result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + while (result.get().equals(BufferResult.emptyResults(0, false))) { + result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + } + List actualPages = result.get().getSerializedPages().stream().map(page -> serde.deserialize(page)).collect(Collectors.toList()); + assertEquals(actualPages.size(), 1); + for (int pageCount = 0; pageCount < actualPages.size(); pageCount++) { + PageAssertions.assertPageEquals(ImmutableList.of(INTEGER, INTEGER), actualPages.get(pageCount), generatePage()); + } + result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + actualPages = result.get().getSerializedPages().stream().map(page -> serde.deserialize(page)).collect(Collectors.toList()); + assertEquals(actualPages.size(), 1); + for (int pageCount = 0; pageCount < actualPages.size(); pageCount++) { + PageAssertions.assertPageEquals(ImmutableList.of(INTEGER, INTEGER), actualPages.get(pageCount), generatePage()); + } + hybridSpoolingBuffer.setNoMorePages(); + } + + @Test + public void testHybridSpoolingBufferWithSerializationKryo() + throws ExecutionException, InterruptedException + { + setConfig(FileSystemExchangeConfig.DirectSerialisationType.KRYO); + HybridSpoolingBuffer hybridSpoolingBuffer = createHybridSpoolingBuffer(); + hybridSpoolingBuffer.setSerde(serde); + hybridSpoolingBuffer.setJavaSerde(javaSerde); + hybridSpoolingBuffer.setKryoSerde(kryoSerde); + List pages = new ArrayList<>(); + pages.add(generatePage()); + pages.add(generatePage()); + hybridSpoolingBuffer.enqueuePages(0, pages, null, kryoSerde); + ListenableFuture result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + while (result.get().equals(BufferResult.emptyResults(0, false))) { + result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + } + List actualPages = result.get().getSerializedPages().stream().map(page -> serde.deserialize(page)).collect(Collectors.toList()); + assertEquals(actualPages.size(), 1); + for (int pageCount = 0; pageCount < actualPages.size(); pageCount++) { + PageAssertions.assertPageEquals(ImmutableList.of(INTEGER, INTEGER), actualPages.get(pageCount), generatePage()); + } + result = hybridSpoolingBuffer.get(new OutputBuffers.OutputBufferId(0), 0, new DataSize(100, DataSize.Unit.MEGABYTE)); + actualPages = result.get().getSerializedPages().stream().map(page -> serde.deserialize(page)).collect(Collectors.toList()); + assertEquals(actualPages.size(), 1); + for (int pageCount = 0; pageCount < actualPages.size(); pageCount++) { + PageAssertions.assertPageEquals(ImmutableList.of(INTEGER, INTEGER), actualPages.get(pageCount), generatePage()); + } + hybridSpoolingBuffer.setNoMorePages(); + } + + private HybridSpoolingBuffer createHybridSpoolingBuffer() + { + OutputBuffers outputBuffers = OutputBuffers.createInitialEmptyOutputBuffers(OutputBuffers.BufferType.PARTITIONED); + outputBuffers.setExchangeSinkInstanceHandle(exchangeSinkInstanceHandle); + + return new HybridSpoolingBuffer(new OutputBufferStateMachine(new TaskId(new StageId(new QueryId("query"), 0), 0, 0), directExecutor()), + outputBuffers, + exchangeSink, + TestSpoolingExchangeOutputBuffer.TestingLocalMemoryContext::new, + exchangeManager); + } + + private SerializedPage generateSerializedPage() + { + Page expectedPage = generatePage(); + SerializedPage page = serde.serialize(expectedPage); + return page; + } + + private Page generatePage() + { + BlockBuilder expectedBlockBuilder = INTEGER.createBlockBuilder(null, 2); + INTEGER.writeLong(expectedBlockBuilder, 10); + INTEGER.writeLong(expectedBlockBuilder, 20); + Block expectedBlock = expectedBlockBuilder.build(); + + return new Page(expectedBlock, expectedBlock); + } + + private boolean deleteDirectory(File dir) + { + File[] allContents = dir.listFiles(); + if (allContents != null) { + for (File file : allContents) { + deleteDirectory(file); + } + } + return dir.delete(); + } +} diff --git a/presto-main/src/test/java/io/prestosql/execution/buffer/TestSpoolingExchangeOutputBuffer.java b/presto-main/src/test/java/io/prestosql/execution/buffer/TestSpoolingExchangeOutputBuffer.java index b5ad674d3..e0706a856 100644 --- a/presto-main/src/test/java/io/prestosql/execution/buffer/TestSpoolingExchangeOutputBuffer.java +++ b/presto-main/src/test/java/io/prestosql/execution/buffer/TestSpoolingExchangeOutputBuffer.java @@ -24,6 +24,7 @@ import io.hetu.core.transport.execution.buffer.PagesSerde; import io.hetu.core.transport.execution.buffer.SerializedPage; import io.prestosql.exchange.ExchangeSink; import io.prestosql.exchange.ExchangeSinkInstanceHandle; +import io.prestosql.exchange.storage.FileSystemExchangeStorage; import io.prestosql.execution.StageId; import io.prestosql.execution.TaskId; import io.prestosql.memory.context.LocalMemoryContext; @@ -31,6 +32,9 @@ import io.prestosql.spi.Page; import io.prestosql.spi.QueryId; import org.testng.annotations.Test; +import javax.crypto.SecretKey; + +import java.net.URI; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -358,6 +362,36 @@ public class TestSpoolingExchangeOutputBuffer return abort; } + @Override + public FileSystemExchangeStorage getExchangeStorage() + { + return null; + } + + @Override + public URI getOutputDirectory() + { + return null; + } + + @Override + public Optional getSecretKey() + { + return Optional.empty(); + } + + @Override + public boolean isExchangeCompressionEnabled() + { + return false; + } + + @Override + public int getPartitionId() + { + return 0; + } + public void setAbort(CompletableFuture abort) { this.abort = requireNonNull(abort, "abort is null"); @@ -370,7 +404,7 @@ public class TestSpoolingExchangeOutputBuffer INSTANCE } - private static class TestingLocalMemoryContext + protected static class TestingLocalMemoryContext implements LocalMemoryContext { @Override diff --git a/presto-spi/src/main/java/io/prestosql/spi/filesystem/HetuFileSystemClient.java b/presto-spi/src/main/java/io/prestosql/spi/filesystem/HetuFileSystemClient.java index b7eb581f1..9d0bb8497 100644 --- a/presto-spi/src/main/java/io/prestosql/spi/filesystem/HetuFileSystemClient.java +++ b/presto-spi/src/main/java/io/prestosql/spi/filesystem/HetuFileSystemClient.java @@ -210,4 +210,7 @@ public interface HetuFileSystemClient Stream getDirectoryStream(Path path, String prefix, String suffix) throws IOException; + + void flush(OutputStream outputStream) + throws IOException; } -- Gitee