From e0ae6010f82befa911982fb53eda56fb8bb68be1 Mon Sep 17 00:00:00 2001 From: huanglong Date: Sat, 28 Dec 2024 18:18:03 +0800 Subject: [PATCH] shuffle hash fix --- .../execution/ColumnarShuffleExchangeExec.scala | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index f144be7ce..bb16d39ee 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -348,20 +348,6 @@ object ColumnarShuffleExchangeExec extends Logging { newIter }, isOrderSensitive = isOrderSensitive) case h@HashPartitioning(expressions, numPartitions) => - //containsRollUp(expressions): Avoid data skew caused by rollup expressions. - //expressions.length > 6: Avoid q11 data skew - //expressions.length == 3: Avoid q28 data skew when the resin rule is enabled. - if (containsRollUp(expressions) || expressions.length > 6 || expressions.length == 3) { - rdd.mapPartitionsWithIndexInternal((_, cbIter) => { - val partitionKeyExtractor: InternalRow => Any = { - val projection = - UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) - row => projection(row).getInt(0) - } - val newIter = computePartitionId(cbIter, partitionKeyExtractor) - newIter - }, isOrderSensitive = isOrderSensitive) - } else { rdd.mapPartitionsWithIndexInternal((_, cbIter) => { val addPid2ColumnBatch = addPidToColumnBatch() // omni project @@ -399,7 +385,6 @@ object ColumnarShuffleExchangeExec extends Logging { } } }, isOrderSensitive = isOrderSensitive) - } case SinglePartition => rdd.mapPartitionsWithIndexInternal((_, cbIter) => { cbIter.map { cb => (0, cb) } -- Gitee