From 44632c72ef8695cd94f4e34458bf79a1bef1db65 Mon Sep 17 00:00:00 2001 From: Vineet Kumar Maheshwari Date: Thu, 30 Mar 2023 15:58:31 +0530 Subject: [PATCH] Replace BigLongArray with long[] and Change temporary data storage in MultiChannelGroupByHash MultiChannelGroupByHash - Temporary output data used to be kept in a list of variable-size pages. That required using a two-level address to point an exact row. Since the hash table can only store 2^30 positions, the number of groups can be stored in a 4-byte int variable, given that the page size is fixed. This way we can get rid of some data structures used to map between group id and output row position skipping some calculations and reducing memory footprint from 8+8+4+1 bytes per hash bucket to 4+1 bytes which improves memory locality. Since the memory footprint of MultiChannelGroupByHash is now smaller, tests using testMemoryReservationYield needed to change because we yield less often due to QueryContext.GUARANTEED_MEMORY threshold and reserve less memory. BigintGroupByHash - The hash table is at most 10^30 positions so an ordinary array will be sufficient. This also slightly reduces memory footprint. --- .../io/prestosql/operator/BigintGroupBy.java | 11 +- .../prestosql/operator/BigintGroupByHash.java | 93 +++++++-------- .../operator/MultiChannelGroupBy.java | 9 +- .../operator/MultiChannelGroupByHash.java | 110 +++++++----------- .../operator/GroupByHashYieldAssertion.java | 18 ++- .../io/prestosql/operator/TestChannelSet.java | 51 ++++---- .../operator/TestDistinctLimitOperator.java | 10 +- .../prestosql/operator/TestGroupByHash.java | 4 +- .../operator/TestHashAggregationOperator.java | 9 +- .../operator/TestHashSemiJoinOperator.java | 7 +- .../operator/TestMarkDistinctOperator.java | 10 +- .../operator/TestRowNumberOperator.java | 8 +- .../TestTopNRankingNumberOperator.java | 51 ++++---- 13 files changed, 185 insertions(+), 206 deletions(-) diff --git a/presto-main/src/main/java/io/prestosql/operator/BigintGroupBy.java b/presto-main/src/main/java/io/prestosql/operator/BigintGroupBy.java index 94afb0c25..f526f1edf 100644 --- a/presto-main/src/main/java/io/prestosql/operator/BigintGroupBy.java +++ b/presto-main/src/main/java/io/prestosql/operator/BigintGroupBy.java @@ -15,10 +15,11 @@ package io.prestosql.operator; import com.google.common.collect.ImmutableList; import io.prestosql.spi.block.Block; -import io.prestosql.spi.block.BlockBuilder; +import io.prestosql.spi.block.LongArrayBlock; import io.prestosql.spi.type.Type; import java.util.List; +import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; @@ -110,7 +111,7 @@ public abstract class BigintGroupBy protected class GetGroupIdsWork implements Work { - private final BlockBuilder blockBuilder; + private final long[] groupIds; private final Block block; private boolean finished; @@ -121,7 +122,7 @@ public abstract class BigintGroupBy { this.block = requireNonNull(block, "block is null"); // we know the exact size required for the block - this.blockBuilder = BIGINT.createFixedSizeBlockBuilder(block.getPositionCount()); + this.groupIds = new long[block.getPositionCount()]; this.groupBy = groupBy; } @@ -142,7 +143,7 @@ public abstract class BigintGroupBy // Therefore needRehash will not generally return true even if we have just crossed the capacity boundary. while (lastPosition < positionCount && !groupBy.needMoreCapacity()) { // output the group id for this row - BIGINT.writeLong(blockBuilder, groupBy.putIfAbsent(lastPosition, block)); + groupIds[lastPosition] = groupBy.putIfAbsent(lastPosition, block); lastPosition++; } return lastPosition == positionCount; @@ -154,7 +155,7 @@ public abstract class BigintGroupBy checkState(lastPosition == block.getPositionCount(), "process has not yet finished"); checkState(!finished, "result has produced"); finished = true; - return new GroupByIdBlock(nextGroupId, blockBuilder.build()); + return new GroupByIdBlock(nextGroupId, new LongArrayBlock(block.getPositionCount(), Optional.empty(), groupIds)); } } } diff --git a/presto-main/src/main/java/io/prestosql/operator/BigintGroupByHash.java b/presto-main/src/main/java/io/prestosql/operator/BigintGroupByHash.java index 5c719de5e..ae5ebca16 100644 --- a/presto-main/src/main/java/io/prestosql/operator/BigintGroupByHash.java +++ b/presto-main/src/main/java/io/prestosql/operator/BigintGroupByHash.java @@ -14,8 +14,6 @@ package io.prestosql.operator; import com.google.common.annotations.VisibleForTesting; -import io.prestosql.array.IntBigArray; -import io.prestosql.array.LongBigArray; import io.prestosql.spi.Page; import io.prestosql.spi.PageBuilder; import io.prestosql.spi.PrestoException; @@ -29,9 +27,11 @@ import io.prestosql.type.BigintOperators; import org.openjdk.jol.info.ClassLayout; import java.io.Serializable; +import java.util.Arrays; import java.util.List; import static com.google.common.base.Preconditions.checkArgument; +import static io.airlift.slice.SizeOf.sizeOf; import static io.prestosql.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; import static io.prestosql.spi.type.BigintType.BIGINT; import static io.prestosql.type.TypeUtils.NULL_HASH_CODE; @@ -51,11 +51,11 @@ public class BigintGroupByHash private int mask; // the hash table from values to groupIds - private LongBigArray values; - private IntBigArray groupIds; + private long[] values; + private int[] groupIds; // reverse index from the groupId back to the value - private final LongBigArray valuesByGroupId; + private long[] valuesByGroupId; private long hashCollisions; private double expectedHashCollisions; @@ -74,13 +74,11 @@ public class BigintGroupByHash maxFill = calculateMaxFill(hashCapacity); mask = hashCapacity - 1; - values = new LongBigArray(); - values.ensureCapacity(hashCapacity); - groupIds = new IntBigArray(-1); - groupIds.ensureCapacity(hashCapacity); + values = new long[hashCapacity]; + groupIds = new int[hashCapacity]; + Arrays.fill(groupIds, -1); - valuesByGroupId = new LongBigArray(); - valuesByGroupId.ensureCapacity(hashCapacity); + valuesByGroupId = new long[maxFill]; // This interface is used for actively reserving memory (push model) for rehash. // The caller can also query memory usage on this object (pull model) @@ -103,9 +101,9 @@ public class BigintGroupByHash public long getEstimatedSize() { return INSTANCE_SIZE + - groupIds.sizeOf() + - values.sizeOf() + - valuesByGroupId.sizeOf() + + sizeOf(groupIds) + + sizeOf(values) + + sizeOf(valuesByGroupId) + preallocatedMemoryInBytes; } @@ -130,7 +128,7 @@ public class BigintGroupByHash blockBuilder.appendNull(); } else { - BIGINT.writeLong(blockBuilder, valuesByGroupId.get(groupId)); + BIGINT.writeLong(blockBuilder, valuesByGroupId[groupId]); } if (outputRawHash) { @@ -139,7 +137,7 @@ public class BigintGroupByHash BIGINT.writeLong(hashBlockBuilder, NULL_HASH_CODE); } else { - BIGINT.writeLong(hashBlockBuilder, BigintOperators.hashCode(valuesByGroupId.get(groupId))); + BIGINT.writeLong(hashBlockBuilder, BigintOperators.hashCode(valuesByGroupId[groupId])); } } } @@ -167,15 +165,15 @@ public class BigintGroupByHash } long value = BIGINT.getLong(block, position); - long hashPosition = getHashPosition(value, mask); + int hashPosition = getHashPosition(value, mask); // look for an empty slot or a slot containing this key while (true) { - int groupId = groupIds.get(hashPosition); + int groupId = groupIds[hashPosition]; if (groupId == -1) { return false; } - if (value == values.get(hashPosition)) { + if (value == values[hashPosition]) { return true; } @@ -187,7 +185,7 @@ public class BigintGroupByHash @Override public long getRawHash(int groupId) { - return BigintType.hash(valuesByGroupId.get(groupId)); + return BigintType.hash(valuesByGroupId[groupId]); } @VisibleForTesting @@ -213,16 +211,16 @@ public class BigintGroupByHash } long value = BIGINT.getLong(block, position); - long hashPosition = getHashPosition(value, mask); + int hashPosition = getHashPosition(value, mask); // look for an empty slot or a slot containing this key while (true) { - int groupId = groupIds.get(hashPosition); + int groupId = groupIds[hashPosition]; if (groupId == -1) { break; } - if (value == values.get(hashPosition)) { + if (value == values[hashPosition]) { return groupId; } @@ -234,14 +232,14 @@ public class BigintGroupByHash return addNewGroup(hashPosition, value); } - private int addNewGroup(long hashPosition, long value) + private int addNewGroup(int hashPosition, long value) { // record group id in hash int groupId = nextGroupId++; - values.set(hashPosition, value); - valuesByGroupId.set(groupId, value); - groupIds.set(hashPosition, groupId); + values[hashPosition] = value; + valuesByGroupId[groupId] = value; + groupIds[hashPosition] = groupId; // increase capacity, if necessary if (needMoreCapacity()) { @@ -271,27 +269,26 @@ public class BigintGroupByHash expectedHashCollisions += estimateNumberOfHashCollisions(getGroupCount(), hashCapacity); int newMask = newCapacity - 1; - LongBigArray newValues = new LongBigArray(); - newValues.ensureCapacity(newCapacity); - IntBigArray newGroupIds = new IntBigArray(-1); - newGroupIds.ensureCapacity(newCapacity); + long[] newValues = new long[newCapacity]; + int[] newGroupIds = new int[newCapacity]; + Arrays.fill(newGroupIds, -1); for (int groupId = 0; groupId < nextGroupId; groupId++) { if (groupId == nullGroupId) { continue; } - long value = valuesByGroupId.get(groupId); + long value = valuesByGroupId[groupId]; // find an empty slot for the address - long hashPosition = getHashPosition(value, newMask); - while (newGroupIds.get(hashPosition) != -1) { + int hashPosition = getHashPosition(value, newMask); + while (newGroupIds[hashPosition] != -1) { hashPosition = (hashPosition + 1) & newMask; hashCollisions++; } // record the mapping - newValues.set(hashPosition, value); - newGroupIds.set(hashPosition, groupId); + newValues[hashPosition] = value; + newGroupIds[hashPosition] = groupId; } mask = newMask; @@ -300,13 +297,13 @@ public class BigintGroupByHash values = newValues; groupIds = newGroupIds; - this.valuesByGroupId.ensureCapacity(maxFill); + this.valuesByGroupId = Arrays.copyOf(valuesByGroupId, maxFill); return true; } - private static long getHashPosition(long rawHash, int mask) + private static int getHashPosition(long rawHash, int mask) { - return murmurHash3(rawHash) & mask; + return (int) murmurHash3(rawHash) & mask; } @Override @@ -316,10 +313,10 @@ public class BigintGroupByHash myState.hashCapacity = hashCapacity; myState.maxFill = maxFill; myState.mask = mask; - myState.values = values.capture(serdeProvider); - myState.groupIds = groupIds.capture(serdeProvider); + myState.values = Arrays.copyOf(values, values.length); + myState.groupIds = Arrays.copyOf(groupIds, groupIds.length); myState.nullGroupId = nullGroupId; - myState.valuesByGroupId = valuesByGroupId.capture(serdeProvider); + myState.valuesByGroupId = Arrays.copyOf(valuesByGroupId, valuesByGroupId.length); myState.nextGroupId = nextGroupId; myState.hashCollisions = hashCollisions; myState.expectedHashCollisions = expectedHashCollisions; @@ -335,10 +332,10 @@ public class BigintGroupByHash this.hashCapacity = myState.hashCapacity; this.maxFill = myState.maxFill; this.mask = myState.mask; - this.values.restore(myState.values, serdeProvider); - this.groupIds.restore(myState.groupIds, serdeProvider); + this.values = Arrays.copyOf(myState.values, myState.values.length); + this.groupIds = Arrays.copyOf(myState.groupIds, myState.groupIds.length); this.nullGroupId = myState.nullGroupId; - this.valuesByGroupId.restore(myState.valuesByGroupId, serdeProvider); + this.valuesByGroupId = Arrays.copyOf(myState.valuesByGroupId, myState.valuesByGroupId.length); this.nextGroupId = myState.nextGroupId; this.hashCollisions = myState.hashCollisions; this.expectedHashCollisions = myState.expectedHashCollisions; @@ -352,10 +349,10 @@ public class BigintGroupByHash private int hashCapacity; private int maxFill; private int mask; - private Object values; - private Object groupIds; + private long[] values; + private int[] groupIds; private int nullGroupId; - private Object valuesByGroupId; + private long[] valuesByGroupId; private int nextGroupId; private long hashCollisions; private double expectedHashCollisions; diff --git a/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupBy.java b/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupBy.java index 75a9aa784..dab8afa17 100644 --- a/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupBy.java +++ b/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupBy.java @@ -21,6 +21,7 @@ import io.prestosql.spi.Page; import io.prestosql.spi.block.Block; import io.prestosql.spi.block.BlockBuilder; import io.prestosql.spi.block.DictionaryBlock; +import io.prestosql.spi.block.LongArrayBlock; import io.prestosql.spi.block.RunLengthEncodedBlock; import io.prestosql.spi.snapshot.BlockEncodingSerdeProvider; import io.prestosql.spi.snapshot.Restorable; @@ -439,7 +440,7 @@ public abstract class MultiChannelGroupBy protected class GetDictionaryGroupIdsWork implements Work { - private final BlockBuilder blockBuilder; + private final long[] groupIds; private final Page page; private final Page dictionaryPage; private final DictionaryBlock dictionaryBlock; @@ -458,7 +459,7 @@ public abstract class MultiChannelGroupBy this.dictionaryPage = createPageWithExtractedDictionary(page); // we know the exact size required for the block - this.blockBuilder = BIGINT.createFixedSizeBlockBuilder(page.getPositionCount()); + this.groupIds = new long[page.getPositionCount()]; this.groupBy = groupBy; } @@ -480,7 +481,7 @@ public abstract class MultiChannelGroupBy while (lastPosition < positionCount && !groupBy.needMoreCapacity()) { int positionInDictionary = dictionaryBlock.getId(lastPosition); int groupId = getGroupId(hashGenerator, dictionaryPage, positionInDictionary, groupBy); - BIGINT.writeLong(blockBuilder, groupId); + groupIds[lastPosition] = groupId; lastPosition++; } return lastPosition == positionCount; @@ -492,7 +493,7 @@ public abstract class MultiChannelGroupBy checkState(lastPosition == page.getPositionCount(), "process has not yet finished"); checkState(!finished, "result has produced"); finished = true; - return new GroupByIdBlock(groupBy.getGroupCount(), blockBuilder.build()); + return new GroupByIdBlock(groupBy.getGroupCount(), new LongArrayBlock(page.getPositionCount(), Optional.empty(), groupIds)); } } diff --git a/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupByHash.java b/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupByHash.java index 532f42a0e..4f8e12c30 100644 --- a/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupByHash.java +++ b/presto-main/src/main/java/io/prestosql/operator/MultiChannelGroupByHash.java @@ -18,7 +18,6 @@ import io.airlift.slice.DynamicSliceOutput; import io.airlift.slice.Slice; import io.airlift.slice.SliceOutput; import io.airlift.slice.Slices; -import io.prestosql.array.LongBigArray; import io.prestosql.spi.Page; import io.prestosql.spi.PageBuilder; import io.prestosql.spi.PrestoException; @@ -36,8 +35,6 @@ import java.util.Optional; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkState; import static io.airlift.slice.SizeOf.sizeOf; -import static io.prestosql.operator.SyntheticAddress.decodePosition; -import static io.prestosql.operator.SyntheticAddress.decodeSliceIndex; import static io.prestosql.operator.SyntheticAddress.encodeSyntheticAddress; import static io.prestosql.spi.StandardErrorCode.GENERIC_INSUFFICIENT_RESOURCES; import static io.prestosql.spi.type.BigintType.BIGINT; @@ -55,6 +52,9 @@ public class MultiChannelGroupByHash { private static final int INSTANCE_SIZE = ClassLayout.parseClass(MultiChannelGroupByHash.class).instanceSize(); private static final float FILL_RATIO = 0.75f; + private static final int VALUES_PAGE_BITS = 14; // 16k positions + private static final int VALUES_PAGE_MAX_ROW_COUNT = 1 << VALUES_PAGE_BITS; + private static final int VALUES_PAGE_MASK = VALUES_PAGE_MAX_ROW_COUNT - 1; private PageBuilder currentPageBuilder; @@ -63,12 +63,11 @@ public class MultiChannelGroupByHash private int hashCapacity; private int maxFill; private int mask; - private long[] groupAddressByHash; + // Group ids are assigned incrementally. Therefore, since values page size is constant and power of two, + // the group id is also an address (slice index and position within slice) to group row in channelBuilders. private int[] groupIdsByHash; private byte[] rawHashByHashPosition; - private final LongBigArray groupAddressByGroupId; - private int nextGroupId; private long hashCollisions; private double expectedHashCollisions; @@ -94,15 +93,11 @@ public class MultiChannelGroupByHash maxFill = calculateMaxFill(hashCapacity); mask = hashCapacity - 1; - groupAddressByHash = new long[hashCapacity]; - Arrays.fill(groupAddressByHash, -1); rawHashByHashPosition = new byte[hashCapacity]; groupIdsByHash = new int[hashCapacity]; - - groupAddressByGroupId = new LongBigArray(); - groupAddressByGroupId.ensureCapacity(maxFill); + Arrays.fill(groupIdsByHash, -1); // This interface is used for actively reserving memory (push model) for rehash. // The caller can also query memory usage on this object (pull model) @@ -112,9 +107,8 @@ public class MultiChannelGroupByHash @Override public long getRawHash(int groupId) { - long address = groupAddressByGroupId.get(groupId); - int blockIndex = decodeSliceIndex(address); - int position = decodePosition(address); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int position = groupId & VALUES_PAGE_MASK; return hashStrategy.hashPosition(blockIndex, position); } @@ -125,9 +119,7 @@ public class MultiChannelGroupByHash (sizeOf(channelBuilders.get(0).elements()) * channelBuilders.size()) + completedPagesMemorySize + currentPageBuilder.getRetainedSizeInBytes() + - sizeOf(groupAddressByHash) + sizeOf(groupIdsByHash) + - groupAddressByGroupId.sizeOf() + sizeOf(rawHashByHashPosition) + preallocatedMemoryInBytes; } @@ -159,9 +151,8 @@ public class MultiChannelGroupByHash @Override public void appendValuesTo(int groupId, PageBuilder pageBuilder, int outputChannelOffset) { - long address = groupAddressByGroupId.get(groupId); - int blockIndex = decodeSliceIndex(address); - int position = decodePosition(address); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int position = groupId & VALUES_PAGE_MASK; hashStrategy.appendTo(blockIndex, position, pageBuilder, outputChannelOffset); } @@ -203,11 +194,11 @@ public class MultiChannelGroupByHash @Override public boolean contains(int position, Page page, int[] hashChannels, long rawHash) { - int hashPosition = (int) getHashPosition(rawHash, mask); + int hashPosition = getHashPosition(rawHash, mask); // look for a slot containing this key - while (groupAddressByHash[hashPosition] != -1) { - if (positionNotDistinctFromCurrentRow(groupAddressByHash[hashPosition], hashPosition, position, page, (byte) rawHash, hashChannels)) { + while (groupIdsByHash[hashPosition] != -1) { + if (positionNotDistinctFromCurrentRow(groupIdsByHash[hashPosition], hashPosition, position, page, (byte) rawHash, hashChannels)) { // found an existing slot for this key return true; } @@ -233,12 +224,12 @@ public class MultiChannelGroupByHash public int putIfAbsent(int position, Page page, long rawHash) { - int hashPosition = (int) getHashPosition(rawHash, mask); + int hashPosition = getHashPosition(rawHash, mask); // look for an empty slot or a slot containing this key int groupId = -1; - while (groupAddressByHash[hashPosition] != -1) { - if (positionNotDistinctFromCurrentRow(groupAddressByHash[hashPosition], hashPosition, position, page, (byte) rawHash, channels)) { + while (groupIdsByHash[hashPosition] != -1) { + if (positionNotDistinctFromCurrentRow(groupIdsByHash[hashPosition], hashPosition, position, page, (byte) rawHash, channels)) { // found an existing slot for this key groupId = groupIdsByHash[hashPosition]; @@ -275,13 +266,11 @@ public class MultiChannelGroupByHash // record group id in hash int groupId = nextGroupId++; - groupAddressByHash[hashPosition] = address; rawHashByHashPosition[hashPosition] = (byte) rawHash; groupIdsByHash[hashPosition] = groupId; - groupAddressByGroupId.set(groupId, address); // create new page builder if this page is full - if (currentPageBuilder.isFull()) { + if (currentPageBuilder.getPositionCount() == VALUES_PAGE_MAX_ROW_COUNT) { startNewPage(); } @@ -321,10 +310,9 @@ public class MultiChannelGroupByHash int newCapacity = toIntExact(newCapacityLong); // An estimate of how much extra memory is needed before we can go ahead and expand the hash table. - // This includes the new capacity for groupAddressByHash, rawHashByHashPosition, groupIdsByHash, and groupAddressByGroupId as well as the size of the current page - preallocatedMemoryInBytes = (newCapacity - hashCapacity) * (long) (Long.BYTES + Integer.BYTES + Byte.BYTES) + - (calculateMaxFill(newCapacity) - maxFill) * Long.BYTES + - currentPageSizeInBytes; + // This includes the new capacity for rawHashByHashPosition, groupIdsByHash as well as the size of the current page + preallocatedMemoryInBytes = (newCapacity - hashCapacity) * (long) (Integer.BYTES + Byte.BYTES) + + currentPageSizeInBytes; if (!updateMemory.update()) { // reserved memory but has exceeded the limit return false; @@ -334,54 +322,46 @@ public class MultiChannelGroupByHash expectedHashCollisions += estimateNumberOfHashCollisions(getGroupCount(), hashCapacity); int newMask = newCapacity - 1; - long[] newKey = new long[newCapacity]; byte[] rawHashes = new byte[newCapacity]; - Arrays.fill(newKey, -1); - int[] newValue = new int[newCapacity]; + int[] newGroupIdHash = new int[newCapacity]; + Arrays.fill(newGroupIdHash, -1); - int oldIndex = 0; - for (int groupId = 0; groupId < nextGroupId; groupId++) { + for (int i = 0; i < hashCapacity; i++) { // seek to the next used slot - while (groupAddressByHash[oldIndex] == -1) { - oldIndex++; + int groupId = groupIdsByHash[i]; + if (groupId == -1) { + continue; } - // get the address for this slot - long address = groupAddressByHash[oldIndex]; - - long rawHash = hashPosition(address); + long rawHash = hashPosition(groupId); // find an empty slot for the address - int pos = (int) getHashPosition(rawHash, newMask); - while (newKey[pos] != -1) { + int pos = getHashPosition(rawHash, newMask); + while (newGroupIdHash[pos] != -1) { pos = (pos + 1) & newMask; hashCollisions++; } // record the mapping - newKey[pos] = address; rawHashes[pos] = (byte) rawHash; - newValue[pos] = groupIdsByHash[oldIndex]; - oldIndex++; + newGroupIdHash[pos] = groupId; } this.mask = newMask; this.hashCapacity = newCapacity; this.maxFill = calculateMaxFill(newCapacity); - this.groupAddressByHash = newKey; this.rawHashByHashPosition = rawHashes; - this.groupIdsByHash = newValue; - groupAddressByGroupId.ensureCapacity(maxFill); + this.groupIdsByHash = newGroupIdHash; return true; } - private long hashPosition(long sliceAddress) + private long hashPosition(int groupId) { - int sliceIndex = decodeSliceIndex(sliceAddress); - int position = decodePosition(sliceAddress); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int blockPosition = groupId & VALUES_PAGE_MASK; if (precomputedHashChannel.isPresent()) { - return getRawHash(sliceIndex, position); + return getRawHash(blockIndex, blockPosition); } - return hashStrategy.hashPosition(sliceIndex, position); + return hashStrategy.hashPosition(blockIndex, blockPosition); } private long getRawHash(int sliceIndex, int position) @@ -389,17 +369,19 @@ public class MultiChannelGroupByHash return channelBuilders.get(precomputedHashChannel.getAsInt()).get(sliceIndex).getLong(position, 0); } - private boolean positionNotDistinctFromCurrentRow(long address, int hashPosition, int position, Page page, byte rawHash, int[] hashChannels) + private boolean positionNotDistinctFromCurrentRow(int groupId, int hashPosition, int position, Page page, byte rawHash, int[] hashChannels) { if (rawHashByHashPosition[hashPosition] != rawHash) { return false; } - return hashStrategy.positionNotDistinctFromRow(decodeSliceIndex(address), decodePosition(address), position, page, hashChannels); + int blockIndex = groupId >> VALUES_PAGE_BITS; + int blockPosition = groupId & VALUES_PAGE_MASK; + return hashStrategy.positionNotDistinctFromRow(blockIndex, blockPosition, position, page, hashChannels); } - private static long getHashPosition(long rawHash, int mask) + private static int getHashPosition(long rawHash, int mask) { - return murmurHash3(rawHash) & mask; + return (int) murmurHash3(rawHash) & mask; } private static int calculateMaxFill(int hashSize) @@ -424,10 +406,8 @@ public class MultiChannelGroupByHash myState.hashCapacity = hashCapacity; myState.maxFill = maxFill; myState.mask = mask; - myState.groupAddressByHash = Arrays.copyOf(groupAddressByHash, groupAddressByHash.length); myState.groupIdsByHash = Arrays.copyOf(groupIdsByHash, groupIdsByHash.length); myState.rawHashByHashPosition = Arrays.copyOf(rawHashByHashPosition, rawHashByHashPosition.length); - myState.groupAddressByGroupId = groupAddressByGroupId.capture(serdeProvider); myState.nextGroupId = nextGroupId; if (dictionaryLookBack != null) { @@ -464,12 +444,9 @@ public class MultiChannelGroupByHash this.hashCapacity = myState.hashCapacity; this.maxFill = myState.maxFill; this.mask = myState.mask; - this.groupAddressByHash = myState.groupAddressByHash; this.groupIdsByHash = myState.groupIdsByHash; this.rawHashByHashPosition = myState.rawHashByHashPosition; - this.groupAddressByGroupId.restore(myState.groupAddressByGroupId, serdeProvider); - this.nextGroupId = myState.nextGroupId; if (myState.dictionaryLookBack != null) { Slice input = Slices.wrappedBuffer(((DictionaryLookBack.DictionaryLookBackState) myState.dictionaryLookBack).dictionary); @@ -507,12 +484,9 @@ public class MultiChannelGroupByHash private int hashCapacity; private int maxFill; private int mask; - private long[] groupAddressByHash; private int[] groupIdsByHash; private byte[] rawHashByHashPosition; - private Object groupAddressByGroupId; - private int nextGroupId; private Object dictionaryLookBack; private long hashCollisions; diff --git a/presto-main/src/test/java/io/prestosql/operator/GroupByHashYieldAssertion.java b/presto-main/src/test/java/io/prestosql/operator/GroupByHashYieldAssertion.java index d36399578..91f7ba694 100644 --- a/presto-main/src/test/java/io/prestosql/operator/GroupByHashYieldAssertion.java +++ b/presto-main/src/test/java/io/prestosql/operator/GroupByHashYieldAssertion.java @@ -166,7 +166,7 @@ public final class GroupByHashYieldAssertion } else { // groupAddressByHash, groupIdsByHash, and rawHashByHashPosition double by hashCapacity; while groupAddressByGroupId double by maxFill = hashCapacity / 0.75 - expectedReservedExtraBytes = oldCapacity * (long) (Long.BYTES * 1.75 + Integer.BYTES + Byte.BYTES) + page.getRetainedSizeInBytes(); + expectedReservedExtraBytes = oldCapacity * (long) (Integer.BYTES + Byte.BYTES); } assertBetweenInclusive(actualIncreasedMemory, expectedReservedExtraBytes, expectedReservedExtraBytes + additionalMemoryInBytes); @@ -188,10 +188,24 @@ public final class GroupByHashYieldAssertion // Assert the estimated reserved memory before rehash is very close to the one after rehash long rehashedMemoryUsage = operator.getOperatorContext().getDriverContext().getMemoryUsage(); - assertBetweenInclusive(rehashedMemoryUsage * 1.0 / newMemoryUsage, 0.99, 1.01); + double memoryUsageErrorUpperBound = 1.01; + double memoryUsageError = rehashedMemoryUsage * 1.0 / newMemoryUsage; + if (memoryUsageError > memoryUsageErrorUpperBound) { + // Usually the error is < 1%, but since MultiChannelGroupByHash.getEstimatedSize + // accounts for changes in completedPagesMemorySize, which is increased if new page is + // added by addNewGroup (an even that cannot be predicted as it depends on the number of unique groups + // in the current page being processed), the difference includes size of the added new page. + // Lower bound is 1% lower than normal because additionalMemoryInBytes includes also aggregator state. + assertBetweenInclusive(rehashedMemoryUsage * 1.0 / (newMemoryUsage + additionalMemoryInBytes), 0.86, memoryUsageErrorUpperBound, + "rehashedMemoryUsage " + rehashedMemoryUsage + ", newMemoryUsage: " + newMemoryUsage); + } + else { + assertBetweenInclusive(memoryUsageError, 0.99, memoryUsageErrorUpperBound); + } // unblocked assertTrue(operator.needsInput()); + assertTrue(operator.getOperatorContext().isWaitingForMemory().isDone()); } } diff --git a/presto-main/src/test/java/io/prestosql/operator/TestChannelSet.java b/presto-main/src/test/java/io/prestosql/operator/TestChannelSet.java index a1bd13827..410b2ea93 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestChannelSet.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestChannelSet.java @@ -23,6 +23,7 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -104,9 +105,16 @@ public class TestChannelSet private Map createChannelSetExpectedMapping() { Map groupByHashMapping = new HashMap<>(); - Map valuesMapping = new HashMap<>(); - Map groupIdsMapping = new HashMap<>(); - Map valuesByGroupIdMapping = new HashMap<>(); + List valuesMapping = new ArrayList<>(8); + List groupIdsMapping = new ArrayList<>(8); + List valuesByGroupIdMapping = new ArrayList<>(6); + for (int i = 0; i < 8; i++) { + valuesMapping.add(0L); + groupIdsMapping.add(-1); + } + for (int i = 0; i < 6; i++) { + valuesByGroupIdMapping.add(0L); + } groupByHashMapping.put("hashCapacity", 8); groupByHashMapping.put("currentPageSizeInBytes", 220L); @@ -121,18 +129,6 @@ public class TestChannelSet groupByHashMapping.put("expectedHashCollisions", 0.0); groupByHashMapping.put("preallocatedMemoryInBytes", 0L); - valuesMapping.put("array", long[][].class); - valuesMapping.put("capacity", 1024); - valuesMapping.put("segments", 1); - - groupIdsMapping.put("array", int[][].class); - groupIdsMapping.put("capacity", 1024); - groupIdsMapping.put("segments", 1); - - valuesByGroupIdMapping.put("array", long[][].class); - valuesByGroupIdMapping.put("capacity", 1024); - valuesByGroupIdMapping.put("segments", 1); - return groupByHashMapping; } @@ -140,9 +136,16 @@ public class TestChannelSet { Map expectedMapping = new HashMap<>(); Map groupByHashMapping = new HashMap<>(); - Map valuesMapping = new HashMap<>(); - Map groupIdsMapping = new HashMap<>(); - Map valuesByGroupIdMapping = new HashMap<>(); + List valuesMapping = new ArrayList<>(8); + List groupIdsMapping = new ArrayList<>(8); + List valuesByGroupIdMapping = new ArrayList<>(6); + for (int i = 0; i < 8; i++) { + valuesMapping.add(0L); + groupIdsMapping.add(-1); + } + for (int i = 0; i < 6; i++) { + valuesByGroupIdMapping.add(0L); + } expectedMapping.put("hash", groupByHashMapping); @@ -159,18 +162,6 @@ public class TestChannelSet groupByHashMapping.put("expectedHashCollisions", 0.0); groupByHashMapping.put("preallocatedMemoryInBytes", 0L); - valuesMapping.put("array", long[][].class); - valuesMapping.put("capacity", 1024); - valuesMapping.put("segments", 1); - - groupIdsMapping.put("array", int[][].class); - groupIdsMapping.put("capacity", 1024); - groupIdsMapping.put("segments", 1); - - valuesByGroupIdMapping.put("array", long[][].class); - valuesByGroupIdMapping.put("capacity", 1024); - valuesByGroupIdMapping.put("segments", 1); - expectedMapping.put("operatorContext", 0); expectedMapping.put("localMemoryContext", 0L); diff --git a/presto-main/src/test/java/io/prestosql/operator/TestDistinctLimitOperator.java b/presto-main/src/test/java/io/prestosql/operator/TestDistinctLimitOperator.java index e3955c258..eec0c577e 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestDistinctLimitOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestDistinctLimitOperator.java @@ -34,7 +34,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.prestosql.RowPagesBuilder.rowPagesBuilder; import static io.prestosql.SessionTestUtils.TEST_SESSION; import static io.prestosql.metadata.MetadataManager.createTestMetadataManager; @@ -136,7 +136,7 @@ public class TestDistinctLimitOperator { Map expectedMapping = new HashMap<>(); expectedMapping.put("operatorContext", 0); - expectedMapping.put("localUserMemoryContext", 33048L); + expectedMapping.put("localUserMemoryContext", 424L); expectedMapping.put("remainingLimit", 2L); expectedMapping.put("finishing", false); expectedMapping.put("nextDistinctId", 3L); @@ -198,9 +198,9 @@ public class TestDistinctLimitOperator Optional.of(1), joinCompiler); - GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((DistinctLimitOperator) operator).getCapacity(), 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((DistinctLimitOperator) operator).getCapacity(), 1_200_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); assertEquals(result.getOutput().stream().mapToInt(Page::getPositionCount).sum(), 6_000 * 600); } } diff --git a/presto-main/src/test/java/io/prestosql/operator/TestGroupByHash.java b/presto-main/src/test/java/io/prestosql/operator/TestGroupByHash.java index ab129e22d..133522cd5 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestGroupByHash.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestGroupByHash.java @@ -153,6 +153,9 @@ public class TestGroupByHash expectedMapping.put("expectedHashCollisions", 858.42432); expectedMapping.put("preallocatedMemoryInBytes", 0L); expectedMapping.put("currentPageSizeInBytes", 1072L); + expectedMapping.put("valuesByGroupId", long[].class); + expectedMapping.put("values", long[].class); + expectedMapping.put("groupIds", int[].class); return expectedMapping; } @@ -324,7 +327,6 @@ public class TestGroupByHash expectedMapping.put("hashCapacity", 256); expectedMapping.put("maxFill", 192); expectedMapping.put("mask", 255); - expectedMapping.put("groupAddressByHash", long[].class); expectedMapping.put("groupIdsByHash", int[].class); expectedMapping.put("rawHashByHashPosition", byte[].class); expectedMapping.put("nextGroupId", 100); diff --git a/presto-main/src/test/java/io/prestosql/operator/TestHashAggregationOperator.java b/presto-main/src/test/java/io/prestosql/operator/TestHashAggregationOperator.java index 731a0027d..3150bf2c0 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestHashAggregationOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestHashAggregationOperator.java @@ -66,6 +66,7 @@ import static io.airlift.slice.SizeOf.SIZE_OF_DOUBLE; import static io.airlift.slice.SizeOf.SIZE_OF_LONG; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.airlift.units.DataSize.Unit.BYTE; import static io.airlift.units.DataSize.Unit.KILOBYTE; import static io.airlift.units.DataSize.Unit.MEGABYTE; @@ -326,7 +327,7 @@ public class TestHashAggregationOperator //TODO-cp-I2DSGQ: change expectedMapping after implementation of operatorContext capture expectedMapping.put("operatorContext", 0); expectedMapping.put("aggregationBuilder", aggregationBuilderMapping); - expectedMapping.put("memoryContext", 10675419L); + expectedMapping.put("memoryContext", 7138412L); expectedMapping.put("inputProcessed", true); expectedMapping.put("finishing", false); expectedMapping.put("finished", false); @@ -658,9 +659,9 @@ public class TestHashAggregationOperator // get result with yield; pick a relatively small buffer for aggregator's memory usage GroupByHashYieldResult result; - result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, this::getHashCapacity, 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, this::getHashCapacity, 450_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); int count = 0; for (Page page : result.getOutput()) { diff --git a/presto-main/src/test/java/io/prestosql/operator/TestHashSemiJoinOperator.java b/presto-main/src/test/java/io/prestosql/operator/TestHashSemiJoinOperator.java index 19469c999..20b9afaa8 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestHashSemiJoinOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestHashSemiJoinOperator.java @@ -40,7 +40,6 @@ import java.util.concurrent.ScheduledExecutorService; import static com.google.common.collect.Iterables.concat; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.testing.Assertions.assertGreaterThan; import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.airlift.units.DataSize.Unit.BYTE; import static io.prestosql.RowPagesBuilder.rowPagesBuilder; @@ -324,10 +323,10 @@ public class TestHashSemiJoinOperator type, setBuilderOperatorFactory, operator -> ((SetBuilderOperator) operator).getCapacity(), - 1_400_000); + 1_200_000); - assertGreaterThanOrEqual(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + assertGreaterThanOrEqual(result.getYieldCount(), 4); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 19); assertEquals(result.getOutput().stream().mapToInt(Page::getPositionCount).sum(), 0); } diff --git a/presto-main/src/test/java/io/prestosql/operator/TestMarkDistinctOperator.java b/presto-main/src/test/java/io/prestosql/operator/TestMarkDistinctOperator.java index 89635b25b..f848355c2 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestMarkDistinctOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestMarkDistinctOperator.java @@ -35,7 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import static io.airlift.concurrent.Threads.daemonThreadsNamed; -import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.prestosql.RowPagesBuilder.rowPagesBuilder; import static io.prestosql.SessionTestUtils.TEST_SESSION; import static io.prestosql.metadata.MetadataManager.createTestMetadataManager; @@ -131,7 +131,7 @@ public class TestMarkDistinctOperator { Map expectedMapping = new HashMap<>(); expectedMapping.put("operatorContext", 0); - expectedMapping.put("localUserMemoryContext", 361496L); + expectedMapping.put("localUserMemoryContext", 295048L); expectedMapping.put("finishing", false); return expectedMapping; } @@ -144,9 +144,9 @@ public class TestMarkDistinctOperator OperatorFactory operatorFactory = new MarkDistinctOperatorFactory(0, new PlanNodeId("test"), ImmutableList.of(type), ImmutableList.of(0), Optional.of(1), joinCompiler); // get result with yield; pick a relatively small buffer for partitionRowCount's memory usage - GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((MarkDistinctOperator) operator).getCapacity(), 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((MarkDistinctOperator) operator).getCapacity(), 1_200_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); int count = 0; for (Page page : result.getOutput()) { diff --git a/presto-main/src/test/java/io/prestosql/operator/TestRowNumberOperator.java b/presto-main/src/test/java/io/prestosql/operator/TestRowNumberOperator.java index feecce8c8..4a40871e6 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestRowNumberOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestRowNumberOperator.java @@ -40,7 +40,7 @@ import java.util.concurrent.ScheduledExecutorService; import static com.google.common.collect.ImmutableList.toImmutableList; import static io.airlift.concurrent.Threads.daemonThreadsNamed; import static io.airlift.testing.Assertions.assertEqualsIgnoreOrder; -import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; import static io.prestosql.RowPagesBuilder.rowPagesBuilder; import static io.prestosql.SessionTestUtils.TEST_SESSION; import static io.prestosql.metadata.MetadataManager.createTestMetadataManager; @@ -232,9 +232,9 @@ public class TestRowNumberOperator joinCompiler); // get result with yield; pick a relatively small buffer for partitionRowCount's memory usage - GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((RowNumberOperator) operator).getCapacity(), 1_400_000); - assertGreaterThan(result.getYieldCount(), 5); - assertGreaterThan(result.getMaxReservedBytes(), 20L << 20); + GroupByHashYieldAssertion.GroupByHashYieldResult result = finishOperatorWithYieldingGroupByHash(input, type, operatorFactory, operator -> ((RowNumberOperator) operator).getCapacity(), 1_200_000); + assertGreaterThanOrEqual(result.getYieldCount(), 5); + assertGreaterThanOrEqual(result.getMaxReservedBytes(), 20L << 20); int count = 0; for (Page page : result.getOutput()) { diff --git a/presto-main/src/test/java/io/prestosql/operator/TestTopNRankingNumberOperator.java b/presto-main/src/test/java/io/prestosql/operator/TestTopNRankingNumberOperator.java index 3ad81d765..f8aa70cd1 100644 --- a/presto-main/src/test/java/io/prestosql/operator/TestTopNRankingNumberOperator.java +++ b/presto-main/src/test/java/io/prestosql/operator/TestTopNRankingNumberOperator.java @@ -29,6 +29,7 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -308,13 +309,21 @@ public class TestTopNRankingNumberOperator Map operatorSnapshotMapping = new HashMap<>(); operatorSnapshotMapping.put("operatorContext", 0); - operatorSnapshotMapping.put("localUserMemoryContext", 51816L); + operatorSnapshotMapping.put("localUserMemoryContext", 19192L); //TopNRankingNumberOperator.groupByHash Map groupByHashMapping = new HashMap<>(); - Map values = new HashMap<>(); - Map groupIds = new HashMap<>(); - Map valuesByGroupId = new HashMap<>(); + List values = Arrays.asList(0L, 0L, 0L, 0L, 0L, 0L, 0L, 2L, 0L, 0L, 0L, 0L, 1L, 0L, 3L, 0L); + List groupIds = Arrays.asList(-1, -1, -1, -1, -1, -1, -1, 1, -1, -1, -1, -1, 0, -1, 2, -1); + List valuesByGroupId = new ArrayList<>(6); + + valuesByGroupId.add(1L); + valuesByGroupId.add(2L); + valuesByGroupId.add(3L); + for (int i = 0; i < 9; i++) { + valuesByGroupId.add(0L); + } + operatorSnapshotMapping.put("groupByHash", groupByHashMapping); groupByHashMapping.put("hashCapacity", 16); groupByHashMapping.put("maxFill", 12); @@ -328,15 +337,6 @@ public class TestTopNRankingNumberOperator groupByHashMapping.put("expectedHashCollisions", 0.0); groupByHashMapping.put("preallocatedMemoryInBytes", 0L); groupByHashMapping.put("currentPageSizeInBytes", 356L); - values.put("array", long[][].class); - values.put("capacity", 1024); - values.put("segments", 1); - groupIds.put("array", int[][].class); - groupIds.put("capacity", 1024); - groupIds.put("segments", 1); - valuesByGroupId.put("array", long[][].class); - valuesByGroupId.put("capacity", 1024); - valuesByGroupId.put("segments", 1); //TopNRankingNumberOperator.groupedTopNBuilder Map groupedTopNBuilderMapping = new HashMap<>(); @@ -368,13 +368,21 @@ public class TestTopNRankingNumberOperator Map operatorSnapshotMapping = new HashMap<>(); operatorSnapshotMapping.put("operatorContext", 0); - operatorSnapshotMapping.put("localUserMemoryContext", 51812L); + operatorSnapshotMapping.put("localUserMemoryContext", 19188L); //TopNRankingNumberOperator.groupByHash Map groupByHashMapping = new HashMap<>(); - Map values = new HashMap<>(); - Map groupIds = new HashMap<>(); - Map valuesByGroupId = new HashMap<>(); + List values = Arrays.asList(0L, 0L, 0L, 0L, 0L, 0L, 0L, 2L, 0L, 0L, 0L, 0L, 1L, 0L, 3L, 0L); + List groupIds = Arrays.asList(-1, -1, -1, -1, -1, -1, -1, 1, -1, -1, -1, -1, 0, -1, 2, -1); + List valuesByGroupId = new ArrayList<>(6); + + valuesByGroupId.add(1L); + valuesByGroupId.add(2L); + valuesByGroupId.add(3L); + for (int i = 0; i < 9; i++) { + valuesByGroupId.add(0L); + } + operatorSnapshotMapping.put("groupByHash", groupByHashMapping); groupByHashMapping.put("hashCapacity", 16); groupByHashMapping.put("maxFill", 12); @@ -388,15 +396,6 @@ public class TestTopNRankingNumberOperator groupByHashMapping.put("expectedHashCollisions", 0.0); groupByHashMapping.put("preallocatedMemoryInBytes", 0L); groupByHashMapping.put("currentPageSizeInBytes", 356L); - values.put("array", long[][].class); - values.put("capacity", 1024); - values.put("segments", 1); - groupIds.put("array", int[][].class); - groupIds.put("capacity", 1024); - groupIds.put("segments", 1); - valuesByGroupId.put("array", long[][].class); - valuesByGroupId.put("capacity", 1024); - valuesByGroupId.put("segments", 1); //TopNRankingNumberOperator.groupedTopNBuilder Map groupedTopNBuilderMapping = new HashMap<>(); -- Gitee