From f1eb496a4d60cf4808cbbcc539e7284738245e8f Mon Sep 17 00:00:00 2001 From: anllick <654610542@qq.com> Date: Thu, 18 Apr 2024 10:37:26 +0800 Subject: [PATCH 1/5] optimize mapjoin --- .../boostkit/hive/OmniMapJoinOperator.java | 220 +++++++++--------- 1 file changed, 112 insertions(+), 108 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java index 0ae33715f..9fff88495 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniMapJoinOperator.java @@ -48,8 +48,6 @@ import org.apache.hadoop.hive.ql.exec.ExprNodeColumnEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeConstantEvaluator; import org.apache.hadoop.hive.ql.exec.ExprNodeEvaluator; import org.apache.hadoop.hive.ql.exec.MapredContext; -import org.apache.hadoop.hive.ql.exec.ObjectCache; -import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.exec.Utilities; @@ -70,7 +68,6 @@ import org.apache.hadoop.hive.ql.plan.MapJoinDesc; import org.apache.hadoop.hive.ql.plan.OperatorDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.plan.api.OperatorType; -import org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; @@ -108,7 +105,6 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; import java.util.TreeMap; -import java.util.concurrent.Future; import java.util.stream.Collectors; public class OmniMapJoinOperator extends AbstractMapJoinOperator @@ -147,12 +143,11 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator private transient List[] buildVecs; - private transient String cacheKey; private transient MapJoinTableContainerSerDe[] mapJoinTableSerdes; - private transient ObjectCache cache; - private static List> vecBatchNeedClose = new ArrayList<>(); private static boolean addedCloseThread; private transient Iterator output; + private static int buildNodeId; + private static Map countShareBuildIds = new HashMap<>(); public OmniMapJoinOperator() { super(); @@ -198,48 +193,68 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator buildInspectors[buildIndex] = new ArrayList<>(joinKeysObjectInspectors[buildIndex]); buildInspectors[buildIndex].addAll(joinValuesObjectInspectors[buildIndex]); } - JoinType joinType = JOIN_TYPE_MAP.get(condn[Math.min(posBigTable, condn.length -1)].getType()); + JoinType joinType = JOIN_TYPE_MAP.get(condn[Math.min(posBigTable, condn.length - 1)].getType()); DataType[] buildTypes = getTypeFromInspectors(Arrays.stream(buildInspectors).filter(Objects::nonNull) .flatMap(List::stream).collect(Collectors.toList())); - omniHashBuilderWithExprOperatorFactory = getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, - buildIndexes.get(buildIndexes.size() - 1)); - buildOperator = omniHashBuilderWithExprOperatorFactory.createOperator(); + String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); + buildNodeId = Math.abs((queryId + this.getOperatorId()).hashCode()); + boolean hasCache = false; + if (!conf.isDynamicPartitionHashJoin()) { + OmniHashBuilderWithExprOperatorFactory.gLock.lock(); + try { + omniHashBuilderWithExprOperatorFactory = OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(buildNodeId); + Integer countShareBuildId = countShareBuildIds.getOrDefault(buildNodeId, 0); + countShareBuildIds.put(buildNodeId, ++countShareBuildId); + if (omniHashBuilderWithExprOperatorFactory == null) { + omniHashBuilderWithExprOperatorFactory = getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, + buildIndexes.get(buildIndexes.size() - 1)); + buildOperator = omniHashBuilderWithExprOperatorFactory.createOperator(); + OmniHashBuilderWithExprOperatorFactory.saveHashBuilderOperatorAndFactory(buildNodeId, + OmniHashBuilderWithExprOperatorFactory.getPartitionId(buildNodeId), + omniHashBuilderWithExprOperatorFactory, buildOperator); + } else { + hasCache = true; + } + } catch (Exception e) { + throw new RuntimeException("hash build failed. errmsg: " + e.getMessage()); + } finally { + OmniHashBuilderWithExprOperatorFactory.gLock.unlock(); + } + } else { + omniHashBuilderWithExprOperatorFactory = getOmniHashBuilderWithExprOperatorFactory(joinType, buildTypes, + buildIndexes.get(buildIndexes.size() - 1)); + buildOperator = omniHashBuilderWithExprOperatorFactory.createOperator(); + } omniLookupJoinWithExprOperatorFactory = getOmniLookupOperatorFactory(omniHashBuilderWithExprOperatorFactory, posBigTable, buildIndexes, buildTypes); joinOperator = omniLookupJoinWithExprOperatorFactory.createOperator(); order = getOrder(posBigTable, joinKeysObjectInspectors.length); initKeyPosToValuePos(); - String queryId = HiveConf.getVar(hconf, HiveConf.ConfVars.HIVEQUERYID); - cacheKey = "HASH_MAP_" + this.getOperatorId() + "_container"; - cache = ObjectCacheFactory.getCache(hconf, queryId, false); final ExecMapperContext mapContext = getExecContext(); final MapredContext mrContext = MapredContext.get(); mapJoinTableSerdes = new MapJoinTableContainerSerDe[conf.getTagLength()]; generateMapMetaData(); - if (!conf.isBucketMapJoin() && !conf.isDynamicPartitionHashJoin()) { - Future>> future = cache.retrieveAsync(cacheKey, - () -> loadBuildVec(mapContext, mrContext)); - asyncInitOperations.add(future); - } else if (!isInputFileChangeSensitive(mapContext)) { - List> vecBatchList = loadBuildVec(mapContext, mrContext); - for (int i = 0; i < vecBatchList.size(); i++) { - buildVecs[buildIndexes.get(i)] = vecBatchList.get(i); - } + + boolean canLoadCache = !conf.isBucketMapJoin() && !conf.isDynamicPartitionHashJoin() && hasCache; + if (!canLoadCache && !isInputFileChangeSensitive(mapContext)) { + loadBuildVec(mapContext, mrContext); } if (!addedCloseThread) { Runtime.getRuntime().addShutdownHook(new Thread(() -> { try { - for (List vecBatchList : vecBatchNeedClose) { - if (vecBatchList != null) { - for (VecBatch vecBatch : vecBatchList) { - vecBatch.releaseAllVectors(); - vecBatch.close(); - } + for (Map.Entry entry : countShareBuildIds.entrySet()) { + Integer value = entry.getValue(); + for (int i = 0; i < value; i++) { + OmniHashBuilderWithExprOperatorFactory.dereferenceHashBuilderOperatorAndFactory(entry.getKey()); + } + if (OmniHashBuilderWithExprOperatorFactory.getHashBuilderOperatorFactory(entry.getKey()) == null) { + LOG.info("release operatorFactor of buildNodeId = " + entry.getKey() + " succeed"); + } else { + LOG.error("release operatorFactor of buildNodeId = " + entry.getKey() + " failed"); } } - LOG.info("release vecBatchNeedClose succeed"); } catch (Exception e) { - LOG.error("release vecBatchNeedClose failed", e); + LOG.error("release operatorFactor failed", e); } })); addedCloseThread = true; @@ -266,22 +281,77 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator inputObjInspectors = newInputObjInspectors; } - private List> loadBuildVec(ExecMapperContext mapContext, MapredContext mrContext) + private void loadBuildVec(ExecMapperContext mapContext, MapredContext mrContext) throws HiveException { List> vecBatches = new ArrayList<>(); for (int index : buildIndexes) { vecBatches.add(getVectorFromMap(index, mapContext, mrContext)); } - if (!conf.isDynamicPartitionHashJoin()) { - LOG.info("add vecBatchNeedClose succeed"); - vecBatchNeedClose.addAll(vecBatches); + + for (int i = 0; i < vecBatches.size(); i++) { + buildVecs[buildIndexes.get(i)] = vecBatches.get(i); + } + + if (joinKeysObjectInspectors.length > 2) { + // There may be many build tables, need to join build tables first, and use the + // result as input of buildOperator + List buildList = new ArrayList<>(); + List probeVec; + OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories = new OmniHashBuilderWithExprOperatorFactory[buildIndexes.size() - 1]; + OmniOperatorFactory[] innerJoinOperatorFactories = new OmniOperatorFactory[buildIndexes.size() - 1]; + OmniOperator[] innerBuildOperators = new OmniOperator[buildIndexes.size() - 1]; + OmniOperator[] innerJoinOperators = new OmniOperator[buildIndexes.size() - 1]; + for (int i = buildIndexes.size() - 2; i >= 0; i--) { + buildList.add(0, buildIndexes.get(i + 1)); + int probeIndex = buildIndexes.get(i); + DataType[] buildTypes = getTypeFromInspectors(buildList.stream() + .flatMap(index -> buildInspectors[index].stream()).collect(Collectors.toList())); + JoinType joinType = JOIN_TYPE_MAP.get(condn[i + 1].getType()); + innerBuildFactories[i] = getOmniHashBuilderWithExprOperatorFactory( + joinType, buildTypes, buildList.get(0)); + innerBuildOperators[i] = innerBuildFactories[i].createOperator(); + innerJoinOperatorFactories[i] = getInnerOmniLookupOperatorFactory(innerBuildFactories[i], + probeIndex, buildList, buildTypes); + innerJoinOperators[i] = innerJoinOperatorFactories[i].createOperator(); + } + for (int i = innerBuildFactories.length - 1; i >= 0; i--) { + int probeIndex = buildIndexes.get(i); + probeVec = getVectorFromCache(probeIndex); + if (i == innerBuildFactories.length - 1) { + List buildVec = getVectorFromCache(buildIndexes.get(buildIndexes.size() - 1)); + for (VecBatch vecBatch : buildVec) { + innerBuildOperators[i].addInput(vecBatch); + } + } + innerBuildOperators[i].getOutput(); + for (VecBatch vecBatch : probeVec) { + innerJoinOperators[i].addInput(vecBatch); + output = innerJoinOperators[i].getOutput(); + while (output.hasNext()) { + if (i == 0) { + buildOperator.addInput(output.next()); + } else { + innerBuildOperators[i - 1].addInput(output.next()); + } + } + } + } + closeInnerOperators(innerBuildOperators, innerJoinOperators, innerBuildFactories, innerJoinOperatorFactories); + buildOperator.getOutput(); + } else { + List cacheVecBatches = getVectorFromCache(1 - posBigTable); + for (VecBatch vecBatch : cacheVecBatches) { + if (vecBatch.getVectorCount() > 0) { + buildOperator.addInput(vecBatch); + } + } + buildOperator.getOutput(); } - return vecBatches; } private boolean isInputFileChangeSensitive(ExecMapperContext mapContext) { return !(mapContext == null || mapContext.getLocalWork() == null - || mapContext.getLocalWork().getInputFileChangeSensitive() == false); + || !mapContext.getLocalWork().getInputFileChangeSensitive()); } private List getVectorFromMap(int pos, ExecMapperContext mapContext, MapredContext mrContext) @@ -598,71 +668,6 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } } - @Override - protected void completeInitializationOp(Object[] os) throws HiveException { - if (os.length > 0) { - List> vecBatchList = (List>) os[0]; - for (int i = 0; i < vecBatchList.size(); i++) { - buildVecs[buildIndexes.get(i)] = vecBatchList.get(i); - } - } - if (joinKeysObjectInspectors.length > 2) { - // There may be many build tables, need to join build tables first, and use the - // result as input of buildOperator - List buildList = new ArrayList<>(); - List probeVec; - OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories = new OmniHashBuilderWithExprOperatorFactory[buildIndexes.size() - 1]; - OmniOperatorFactory[] innerJoinOperatorFactories = new OmniOperatorFactory[buildIndexes.size() - 1]; - OmniOperator[] innerBuildOperators = new OmniOperator[buildIndexes.size() - 1]; - OmniOperator[] innerJoinOperators = new OmniOperator[buildIndexes.size() - 1]; - for (int i = buildIndexes.size() - 2; i >= 0; i--) { - buildList.add(0, buildIndexes.get(i + 1)); - int probeIndex = buildIndexes.get(i); - DataType[] buildTypes = getTypeFromInspectors(buildList.stream() - .flatMap(index -> buildInspectors[index].stream()).collect(Collectors.toList())); - JoinType joinType = JOIN_TYPE_MAP.get(condn[i + 1].getType()); - innerBuildFactories[i] = getOmniHashBuilderWithExprOperatorFactory( - joinType, buildTypes, buildList.get(0)); - innerBuildOperators[i] = innerBuildFactories[i].createOperator(); - innerJoinOperatorFactories[i] = getInnerOmniLookupOperatorFactory(innerBuildFactories[i], - probeIndex, buildList, buildTypes); - innerJoinOperators[i] = innerJoinOperatorFactories[i].createOperator(); - } - for (int i = innerBuildFactories.length - 1; i >= 0; i--) { - int probeIndex = buildIndexes.get(i); - probeVec = getVectorFromCache(probeIndex); - if (i == innerBuildFactories.length - 1) { - List buildVec = getVectorFromCache(buildIndexes.get(buildIndexes.size() - 1)); - for (VecBatch vecBatch : buildVec) { - innerBuildOperators[i].addInput(vecBatch); - } - } - innerBuildOperators[i].getOutput(); - for (VecBatch vecBatch : probeVec) { - innerJoinOperators[i].addInput(vecBatch); - output = innerJoinOperators[i].getOutput(); - while (output.hasNext()) { - if (i == 0) { - buildOperator.addInput(output.next()); - } else { - innerBuildOperators[i - 1].addInput(output.next()); - } - } - } - } - closeInnerOperators(innerBuildOperators, innerJoinOperators, innerBuildFactories, innerJoinOperatorFactories); - buildOperator.getOutput(); - } else { - List vecBatches = getVectorFromCache(1 - posBigTable); - for (VecBatch vecBatch : vecBatches) { - if (vecBatch.getVectorCount() > 0) { - buildOperator.addInput(vecBatch); - } - } - buildOperator.getOutput(); - } - } - private void closeInnerOperators(OmniOperator[] innerBuildOperators, OmniOperator[] innerJoinOperators, OmniHashBuilderWithExprOperatorFactory[] innerBuildFactories, OmniOperatorFactory[] innerJoinOperatorFactories) { for (OmniOperator operator : innerBuildOperators) { @@ -680,10 +685,7 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator } private List getVectorFromCache(int index) { - if (conf.isDynamicPartitionHashJoin()) { - return buildVecs[index]; - } - return buildVecs[index].stream().map(OmniHiveOperator::copyVecBatch).collect(Collectors.toList()); + return buildVecs[index]; } private int[] getOrder(int posBigTable, int aliasNum) { @@ -875,9 +877,11 @@ public class OmniMapJoinOperator extends AbstractMapJoinOperator @Override public void closeOp(boolean abort) throws HiveException { joinOperator.close(); - buildOperator.close(); omniLookupJoinWithExprOperatorFactory.close(); - omniHashBuilderWithExprOperatorFactory.close(); + if (conf.isDynamicPartitionHashJoin()) { + buildOperator.close(); + omniHashBuilderWithExprOperatorFactory.close(); + } output = null; super.closeOp(abort); } -- Gitee From c2e16796e7e9625a9cd419da848e357cbfbeabae Mon Sep 17 00:00:00 2001 From: zhangyuxi <1434187877@qq.com> Date: Fri, 10 May 2024 14:16:21 +0800 Subject: [PATCH 2/5] filter support between expression --- .../hive/expression/BetweenExpression.java | 53 +++++++++++++++++++ .../processor/BetweenExpressionProcessor.java | 24 ++------- 2 files changed, 57 insertions(+), 20 deletions(-) create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/expression/BetweenExpression.java diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/expression/BetweenExpression.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/expression/BetweenExpression.java new file mode 100644 index 000000000..ec0f66519 --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/expression/BetweenExpression.java @@ -0,0 +1,53 @@ +package com.huawei.boostkit.hive.expression; + +import com.google.gson.Gson; + +import javax.annotation.Nullable; + +public class BetweenExpression extends BaseExpression{ + + @Nullable + private BaseExpression lower_bound; + @Nullable + private BaseExpression upper_bound; + @Nullable + private BaseExpression value; + + public BetweenExpression(String exprType, Integer returnType, String operator) { + super(exprType, returnType, operator); + } + + public BetweenExpression(String exprType, Integer returnType, String operator, BaseExpression lower_bound, + BaseExpression upper_bound, BaseExpression value) { + super(exprType, returnType, operator); + this.lower_bound = lower_bound; + this.upper_bound = upper_bound; + this.value = value; + } + + @Override + public String toString() { + Gson gson = new Gson(); + return gson.toJson(this); + } + + @Override + public void add(BaseExpression node) { + + } + + @Override + public boolean isFull() { + return false; + } + + @Override + public void setLocated(Located located) { + + } + + @Override + public Located getLocated() { + return null; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/processor/BetweenExpressionProcessor.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/processor/BetweenExpressionProcessor.java index 10e0f510e..6f9a09c20 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/processor/BetweenExpressionProcessor.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/processor/BetweenExpressionProcessor.java @@ -19,10 +19,9 @@ package com.huawei.boostkit.hive.processor; import com.huawei.boostkit.hive.expression.BaseExpression; -import com.huawei.boostkit.hive.expression.CompareExpression; +import com.huawei.boostkit.hive.expression.BetweenExpression; import com.huawei.boostkit.hive.expression.ExpressionUtils; import com.huawei.boostkit.hive.expression.TypeUtils; -import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; @@ -34,19 +33,6 @@ public class BetweenExpressionProcessor implements ExpressionProcessor { @Override public BaseExpression process(ExprNodeGenericFuncDesc node, String operator, ObjectInspector inspector) { - - ExprNodeConstantDesc exprNodeDesc = (ExprNodeConstantDesc) node.getChildren().get(0); - boolean not = (boolean) exprNodeDesc.getValue(); - - CompareExpression leaf = new CompareExpression("BINARY", - TypeUtils.convertHiveTypeToOmniType(node.getTypeInfo()), - not ? "OR" : "AND"); - leaf.add(new CompareExpression("BINARY", - TypeUtils.convertHiveTypeToOmniType(node.getTypeInfo()), - not ? "LESS_THAN" : "GREATER_THAN_OR_EQUAL")); - leaf.add(new CompareExpression("BINARY", - TypeUtils.convertHiveTypeToOmniType(node.getTypeInfo()), - not ? "GREATER_THAN" : "LESS_THAN_OR_EQUAL")); BaseExpression subLeaf; ExprNodeDesc comparedNode = node.getChildren().get(1); if (node.getChildren().get(1) instanceof ExprNodeGenericFuncDesc) { @@ -54,11 +40,9 @@ public class BetweenExpressionProcessor implements ExpressionProcessor { } else { subLeaf = ExpressionUtils.createNode(comparedNode, inspector); } - leaf.add(subLeaf); - leaf.add(createBoundNode(node, comparedNode, LOWER_BOUND_INDEX, inspector)); - leaf.add(subLeaf); - leaf.add(createBoundNode(node, comparedNode, UPPER_BOUND_INDEX, inspector)); - return leaf; + return new BetweenExpression("BETWEEN", TypeUtils.convertHiveTypeToOmniType(node.getTypeInfo()), + null, createBoundNode(node, comparedNode, LOWER_BOUND_INDEX, inspector), + createBoundNode(node, comparedNode, UPPER_BOUND_INDEX, inspector), subLeaf); } private BaseExpression createBoundNode(ExprNodeGenericFuncDesc rootNode, -- Gitee From 02ca7c7f9f43236f808614745e7c858603cba9f6 Mon Sep 17 00:00:00 2001 From: zhangyuxi <1434187877@qq.com> Date: Wed, 8 May 2024 11:36:18 +0800 Subject: [PATCH 3/5] shuffle support topN --- .../hive/OmniExecuteWithHookContext.java | 8 +- .../hive/shuffle/FixedWidthColumnSerDe.java | 4 +- .../FixedWidthColumnSortableSerde.java | 85 +++++++++++++++++++ .../hive/shuffle/OmniVecBatchSerDe.java | 51 ++++++++++- 4 files changed, 142 insertions(+), 6 deletions(-) create mode 100644 omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortableSerde.java diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java index d72763c00..cf6621b28 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniExecuteWithHookContext.java @@ -338,8 +338,10 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { // is VectorOperator reducer = (Operator) reducer.getChildOperators().get(0); } - if (reducer.getType().equals(OperatorType.GROUPBY) && isReplaceable(reducer, true)) { - tezWork.getEdgeProperty(work, child).setEdgeType(TezEdgeProperty.EdgeType.CUSTOM_SIMPLE_EDGE); + if (isReplaceable(reducer, true)) { + if (reducer.getType().equals(OperatorType.GROUPBY)) { + tezWork.getEdgeProperty(work, child).setEdgeType(TezEdgeProperty.EdgeType.CUSTOM_SIMPLE_EDGE); + } return true; } return false; @@ -386,7 +388,7 @@ public class OmniExecuteWithHookContext implements ExecuteWithHookContext { unReplaceable = true; } - if (reduceSinkDesc.getTopN() != -1 || !reduceSinkDesc.getDistinctColumnIndices().isEmpty()) { + if (!reduceSinkDesc.getDistinctColumnIndices().isEmpty()) { unReplaceable = true; } for (ExprNodeDesc exprNodeDesc : reduceSinkDesc.getPartitionCols()) { diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java index f212711b7..2de677dd7 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSerDe.java @@ -19,8 +19,8 @@ package com.huawei.boostkit.hive.shuffle; public class FixedWidthColumnSerDe implements ColumnSerDe { - private int columnTypeLen; - private static byte[] EMPTY = new byte[16]; + protected int columnTypeLen; + protected static byte[] EMPTY = new byte[16]; public FixedWidthColumnSerDe(int columnTypeLen) { this.columnTypeLen = columnTypeLen; diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortableSerde.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortableSerde.java new file mode 100644 index 000000000..92291e3eb --- /dev/null +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/FixedWidthColumnSortableSerde.java @@ -0,0 +1,85 @@ +package com.huawei.boostkit.hive.shuffle; + +/* + * Copyright (C) 2023-2024. Huawei Technologies Co., Ltd. All rights reserved. + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +public class FixedWidthColumnSortableSerde extends FixedWidthColumnSerDe { + private transient boolean columnSortOrderIsDesc; + private transient byte columnNullMarker; + private transient byte columnNotNullMarker; + + public FixedWidthColumnSortableSerde(int columnTypeLen, byte columnNullMarker, byte columnNotNullMarker) { + super(columnTypeLen); + this.columnSortOrderIsDesc = columnSortOrderIsDesc; + this.columnNullMarker = columnNullMarker; + this.columnNotNullMarker = columnNotNullMarker; + } + + @Override + public int serialize(byte[] writeBytes, VecWrapper vecWrapper, int offset) { + int index = vecWrapper.index; + if (vecWrapper.isNull[index] == 1) { + writeBytes[offset] = -1; + ++offset; + return offset; + } + int valueLen = trimBytes(vecWrapper.value, index * columnTypeLen, columnTypeLen); + writeBytes[offset] = (byte) valueLen; + ++offset; + int startOffset = offset; + // write value array + System.arraycopy(vecWrapper.value, index * columnTypeLen, writeBytes, offset, valueLen); + offset = offset + valueLen; + int endOffset = offset; + for (int i = startOffset; i < endOffset; i++) { + writeBytes[i] = (byte) (0xff ^ writeBytes[i]); + } + return offset; + } + + @Override + public int deserialize(VecSerdeBody vecSerdeBody, byte[] bytes, int offset) { + if (bytes[offset] == -1) { + vecSerdeBody.isNull = 1; + ++offset; + System.arraycopy(EMPTY, 0, vecSerdeBody.value, 0, columnTypeLen); + return offset; + } + vecSerdeBody.isNull = 0; + int length = bytes[offset]; + ++offset; + for (int i = offset; i < offset + length; i++) { + bytes[i] = (byte) (0xff ^ bytes[i]); + } + System.arraycopy(bytes, offset, vecSerdeBody.value, 0, length); + System.arraycopy(EMPTY, 0, vecSerdeBody.value, length, columnTypeLen - length); + offset = offset + length; + return offset; + } + + private int trimBytes(byte[] bytes, int start, int length) { + int index = 0; + for (int i = length - 1; i >= 0; i--) { + if (bytes[start + i] != 0) { + index = i; + break; + } + } + return index + 1; + } +} diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java index fc34aa65e..ea098e71a 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/shuffle/OmniVecBatchSerDe.java @@ -19,6 +19,7 @@ package com.huawei.boostkit.hive.shuffle; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; @@ -63,6 +64,13 @@ public class OmniVecBatchSerDe extends AbstractSerDe { private transient ColumnSerDe[] columnSerDes; + private transient boolean[] columnSortOrderIsDesc; + private transient byte[] columnNullMarker; + private transient byte[] columnNotNullMarker; + + public static final byte ZERO = (byte) 0; + public static final byte ONE = (byte) 1; + public static final Map TYPE_LEN = new HashMap() { { put(PrimitiveObjectInspector.PrimitiveCategory.BYTE, 1); @@ -108,6 +116,43 @@ public class OmniVecBatchSerDe extends AbstractSerDe { this.stats = new SerDeStats(); this.lastOperationSerialize = false; this.lastOperationDeserialize = false; + + // Get the sort order + String columnSortOrder = properties.getProperty(serdeConstants.SERIALIZATION_SORT_ORDER); + columnSortOrderIsDesc = new boolean[columnNames.size()]; + for (int i = 0; i < columnSortOrderIsDesc.length; i++) { + columnSortOrderIsDesc[i] = (columnSortOrder != null && columnSortOrder.charAt(i) == '-'); + } + + // NULL first/last + String columnNullOrder = properties.getProperty(serdeConstants.SERIALIZATION_NULL_SORT_ORDER); + columnNullMarker = new byte[columnNames.size()]; + columnNotNullMarker = new byte[columnNames.size()]; + for (int i = 0; i < columnSortOrderIsDesc.length; i++) { + if (columnSortOrderIsDesc[i]) { + // Descending + if (columnNullOrder != null && columnNullOrder.charAt(i) == 'a') { + // Null first + columnNullMarker[i] = ONE; + columnNotNullMarker[i] = ZERO; + } else { + // Null last (default for descending order) + columnNullMarker[i] = ZERO; + columnNotNullMarker[i] = ONE; + } + } else { + // Ascending + if (columnNullOrder != null && columnNullOrder.charAt(i) != 'z') { + // Null last + columnNullMarker[i] = ONE; + columnNotNullMarker[i] = ZERO; + } else { + // Null first (default for ascending order) + columnNullMarker[i] = ZERO; + columnNotNullMarker[i] = ONE; + } + } + } initialSerializeParam(); } @@ -150,7 +195,11 @@ public class OmniVecBatchSerDe extends AbstractSerDe { columnSerDes[i] = new VariableWidthColumnSerDe(); } else { writeLen = writeLen + columnTypeLen[i] + 1; - columnSerDes[i] = new FixedWidthColumnSerDe(columnTypeLen[i]); + if (columnSortOrderIsDesc[i]) { + columnSerDes[i] = new FixedWidthColumnSortableSerde(columnTypeLen[i], columnNullMarker[i], columnNotNullMarker[i]); + } else { + columnSerDes[i] = new FixedWidthColumnSerDe(columnTypeLen[i]); + } } } return writeLen; -- Gitee From 657313e5d9c0e74aa7cc71b3a8637cb7744f2654 Mon Sep 17 00:00:00 2001 From: zhangyuxi <1434187877@qq.com> Date: Mon, 25 Mar 2024 19:39:52 +0800 Subject: [PATCH 4/5] optimized for expandVecBatch --- .../boostkit/hive/OmniGroupByOperator.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java index 5b34ee27c..abf6b6158 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java @@ -20,6 +20,8 @@ package com.huawei.boostkit.hive; import static com.huawei.boostkit.hive.expression.TypeUtils.buildInputDataType; import static nova.hetu.omniruntime.constants.FunctionType.OMNI_AGGREGATION_TYPE_COUNT_ALL; +import static nova.hetu.omniruntime.type.DataType.DataTypeId.OMNI_CHAR; +import static nova.hetu.omniruntime.type.DataType.DataTypeId.OMNI_VARCHAR; import static org.apache.hadoop.hive.ql.exec.GroupByOperator.groupingSet2BitSet; import static org.apache.hadoop.hive.ql.exec.GroupByOperator.shouldEmitSummaryRow; @@ -47,6 +49,7 @@ import nova.hetu.omniruntime.vector.VarcharVec; import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import nova.hetu.omniruntime.vector.VecFactory; +import org.apache.arrow.vector.VarCharVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; @@ -509,14 +512,13 @@ public class OmniGroupByOperator extends OmniHiveOperator imple int rowCount = vec.getSize(); int groupingSetSize = groupingSets.size(); Vec newVec = VecFactory.createFlatVec(rowCount * groupingSetSize, vec.getType()); - Vec flatVec = vec; - if (vec instanceof DictionaryVec) { - flatVec = ((DictionaryVec) vec).expandDictionary(); - } + Vec flatVec = (vec instanceof DictionaryVec) ? ((DictionaryVec) vec).expandDictionary() : vec; + byte[] rawValueNulls = vec.getRawValueNulls(); + DataType.DataTypeId dataTypeId = vec.getType().getId(); + int[] rawValueOffset = (dataTypeId == OMNI_VARCHAR || dataTypeId == OMNI_CHAR) ? ((VarcharVec) flatVec).getRawValueOffset() : new int[0]; for (int i = 0; i < groupingSetSize; i++) { + newVec.setNulls(i * rowCount, rawValueNulls, 0, rowCount); if ((groupingSets.get(i) & mask) == 0) { - DataType.DataTypeId dataTypeId = vec.getType().getId(); - newVec.setNulls(i * rowCount, vec.getValuesNulls(0, rowCount), 0, rowCount); switch (dataTypeId) { case OMNI_INT: case OMNI_DATE32: @@ -543,14 +545,14 @@ public class OmniGroupByOperator extends OmniHiveOperator imple case OMNI_VARCHAR: case OMNI_CHAR: ((VarcharVec) newVec).put(i * rowCount, ((VarcharVec) flatVec).get(0, rowCount), 0, - ((VarcharVec) flatVec).getValueOffset(0, rowCount), 0, rowCount); + rawValueOffset, 0, rowCount); break; default: throw new RuntimeException("Not support dataType, dataTypeId: " + dataTypeId); } } else { - boolean[] nulls = new boolean[rowCount]; - Arrays.fill(nulls, true); + byte[] nulls = new byte[rowCount]; + Arrays.fill(nulls, (byte) 1); newVec.setNulls(i * rowCount, nulls, 0, rowCount); } } -- Gitee From a154e0204a95a4d72af0a1e4d30b633630f77e81 Mon Sep 17 00:00:00 2001 From: zhangyuxi <1434187877@qq.com> Date: Mon, 25 Mar 2024 19:48:14 +0800 Subject: [PATCH 5/5] optimized for expandVecBatch --- .../main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java index abf6b6158..d88ffc24b 100644 --- a/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java +++ b/omnioperator/omniop-hive-extension/src/main/java/com/huawei/boostkit/hive/OmniGroupByOperator.java @@ -49,7 +49,6 @@ import nova.hetu.omniruntime.vector.VarcharVec; import nova.hetu.omniruntime.vector.Vec; import nova.hetu.omniruntime.vector.VecBatch; import nova.hetu.omniruntime.vector.VecFactory; -import org.apache.arrow.vector.VarCharVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.type.Date; import org.apache.hadoop.hive.common.type.HiveDecimal; -- Gitee