From c24c02b2cf2634bcc5943c92e2d1a818cac85c95 Mon Sep 17 00:00:00 2001 From: maxiaoqi2020 Date: Thu, 21 Jul 2022 15:48:27 +0800 Subject: [PATCH 1/2] AQE feature support for omnioperator --- .../boostkit/spark/ColumnarPlugin.scala | 57 ++++- .../ColumnarBroadcastExchangeExec.scala | 10 +- .../spark/sql/execution/ColumnarExec.scala | 53 ----- .../ColumnarShuffleExchangeExec.scala | 8 +- .../ColumnarCustomShuffleReaderExec.scala | 209 ++++++++++++++++++ 5 files changed, 272 insertions(+), 65 deletions(-) create mode 100644 omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index 74bbe2e5a..7c2763638 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -23,9 +23,9 @@ import org.apache.spark.sql.catalyst.expressions.DynamicPruningSubquery import org.apache.spark.sql.catalyst.expressions.aggregate.Partial import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.{RowToOmniColumnarExec, _} -import org.apache.spark.sql.execution.adaptive.QueryStageExec +import org.apache.spark.sql.execution.adaptive.{ColumnarCustomShuffleReaderExec, CustomShuffleReaderExec, QueryStageExec, ShuffleQueryStageExec} import org.apache.spark.sql.execution.aggregate.HashAggregateExec -import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ShuffleExchangeExec} +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, Exchange, ReusedExchangeExec, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.execution.window.WindowExec import org.apache.spark.sql.internal.SQLConf @@ -58,6 +58,13 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { def setAdaptiveSupport(enable: Boolean): Unit = { isSupportAdaptive = enable } + def checkBhjRightChild(x: Any): Boolean = { + x match { + case _: ColumnarFilterExec | _: ColumnarConditionProjectExec => true + case _ => false + } + } + def replaceWithColumnarPlan(plan: SparkPlan): SparkPlan = plan match { case plan: RowGuard => val actualPlan: SparkPlan = plan.child match { @@ -112,7 +119,7 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { case plan: HashAggregateExec if enableColumnarHashAgg => val child = replaceWithColumnarPlan(plan.child) logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") - if (enableFusion) { + if (enableFusion && !isSupportAdaptive) { if (plan.aggregateExpressions.forall(_.mode == Partial)) { child match { case proj1 @ ColumnarProjectExec(_, @@ -125,7 +132,10 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { join4 @ ColumnarBroadcastHashJoinExec(_, _, _, _, _, filter @ ColumnarFilterExec(_, scan @ ColumnarFileSourceScanExec(_, _, _, _, _, _, _, _, _) - ), _, _)), _, _)), _, _)), _, _)) => + ), _, _)), _, _)), _, _)), _, _)) + if checkBhjRightChild( + child.asInstanceOf[ColumnarProjectExec].child.children(1) + .asInstanceOf[ColumnarBroadcastExchangeExec].child) => ColumnarMultipleOperatorExec( plan, proj1, @@ -153,7 +163,10 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { proj3 @ ColumnarProjectExec(_, join3 @ ColumnarBroadcastHashJoinExec(_, _, _, _, _, _, filter @ ColumnarFilterExec(_, - scan @ ColumnarFileSourceScanExec(_, _, _, _, _, _, _, _, _)), _)) , _, _)), _, _)) => + scan @ ColumnarFileSourceScanExec(_, _, _, _, _, _, _, _, _)), _)) , _, _)), _, _)) + if checkBhjRightChild( + child.asInstanceOf[ColumnarProjectExec].child.children(1) + .asInstanceOf[ColumnarBroadcastExchangeExec].child) => ColumnarMultipleOperatorExec1( plan, proj1, @@ -179,7 +192,10 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { proj3 @ ColumnarProjectExec(_, join3 @ ColumnarBroadcastHashJoinExec(_, _, _, _, _, filter @ ColumnarFilterExec(_, - scan @ ColumnarFileSourceScanExec(_, _, _, _, _, _, _, _, _)), _, _)) , _, _)), _, _)) => + scan @ ColumnarFileSourceScanExec(_, _, _, _, _, _, _, _, _)), _, _)) , _, _)), _, _)) + if checkBhjRightChild( + child.asInstanceOf[ColumnarProjectExec].child.children(1) + .asInstanceOf[ColumnarBroadcastExchangeExec].child) => ColumnarMultipleOperatorExec1( plan, proj1, @@ -295,6 +311,27 @@ case class ColumnarPreOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logInfo(s"Columnar Processing for ${plan.getClass} is currently supported.") new ColumnarShuffleExchangeExec(plan.outputPartitioning, child) + case plan: CustomShuffleReaderExec if columnarConf.enableColumnarShuffle => + plan.child match { + case shuffle: ColumnarShuffleExchangeExec => + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs) + case ShuffleQueryStageExec(_, shuffle: ColumnarShuffleExchangeExec) => + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarCustomShuffleReaderExec(plan.child, plan.partitionSpecs) + case ShuffleQueryStageExec(_, reused: ReusedExchangeExec) => + reused match { + case ReusedExchangeExec(_, shuffle: ColumnarShuffleExchangeExec) => + logDebug(s"Columnar Processing for ${plan.getClass} is currently supported.") + ColumnarCustomShuffleReaderExec( + plan.child, + plan.partitionSpecs) + case _ => + plan + } + case _ => + plan + } case p => val children = plan.children.map(replaceWithColumnarPlan) logInfo(s"Columnar Processing for ${p.getClass} is currently not supported.") @@ -318,14 +355,20 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { val child = replaceWithColumnarPlan(plan.child) logInfo(s"Columnar Processing for ${plan.getClass} is currently supported") RowToOmniColumnarExec(child) + case ColumnarToRowExec(child: ColumnarShuffleExchangeExec) => + replaceWithColumnarPlan(child) case ColumnarToRowExec(child: ColumnarBroadcastExchangeExec) => replaceWithColumnarPlan(child) + case plan: ColumnarToRowExec => + val child = replaceWithColumnarPlan(plan.child) + OmniColumnarToRowExec(child) case r: SparkPlan if !r.isInstanceOf[QueryStageExec] && !r.supportsColumnar && r.children.exists(c => c.isInstanceOf[ColumnarToRowExec]) => val children = r.children.map { case c: ColumnarToRowExec => - c.withNewChildren(c.children.map(replaceWithColumnarPlan)) + val child = replaceWithColumnarPlan(c.child) + OmniColumnarToRowExec(child) case other => replaceWithColumnarPlan(other) } diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala index bc7d79a91..8e57a7397 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarBroadcastExchangeExec.scala @@ -22,16 +22,16 @@ import java.util.concurrent._ import com.huawei.boostkit.spark.util.OmniAdaptorUtil.transColBatchToOmniVecs import nova.hetu.omniruntime.vector.VecBatch import nova.hetu.omniruntime.vector.serialize.VecBatchSerializerFactory + import scala.concurrent.{ExecutionContext, Promise} import scala.concurrent.duration.NANOSECONDS import scala.util.control.NonFatal - import org.apache.spark.{broadcast, SparkException} import org.apache.spark.launcher.SparkLauncher import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.plans.physical.BroadcastMode -import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec +import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, BroadcastExchangeLike} import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf} import org.apache.spark.unsafe.map.BytesToBytesMap @@ -39,7 +39,7 @@ import org.apache.spark.util.{SparkFatalException, ThreadUtils} class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) - extends BroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) { + extends BroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) with BroadcastExchangeLike { import ColumnarBroadcastExchangeExec._ override def nodeName: String = "OmniColumnarBroadcastExchange" @@ -54,6 +54,10 @@ class ColumnarBroadcastExchangeExec(mode: BroadcastMode, child: SparkPlan) @transient private lazy val promise = Promise[broadcast.Broadcast[Any]]() + @transient + override lazy val completionFuture: scala.concurrent.Future[broadcast.Broadcast[Any]] = + promise.future + @transient private val timeout: Long = SQLConf.get.broadcastTimeout diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala index 6f27cf3f0..a4056e6b5 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarExec.scala @@ -328,56 +328,3 @@ case class OmniColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransiti } } } - -/** - * Apply any user defined [[ColumnarRule]]s and find the correct place to insert transitions - * to/from columnar formatted data. - */ -case class ApplyColumnarRulesAndInsertTransitions(columnarRules: Seq[ColumnarRule]) - extends Rule[SparkPlan] { - - /** - * Inserts an transition to columnar formatted data. - */ - private def insertRowToColumnar(plan: SparkPlan): SparkPlan = { - if (!plan.supportsColumnar) { - // The tree feels kind of backwards - // Columnar Processing will start here, so transition from row to columnar - RowToOmniColumnarExec(insertTransitions(plan)) - } else if (!plan.isInstanceOf[RowToColumnarTransition]) { - plan.withNewChildren(plan.children.map(insertRowToColumnar)) - } else { - plan - } - } - - /** - * Inserts RowToColumnarExecs and ColumnarToRowExecs where needed. - */ - private def insertTransitions(plan: SparkPlan): SparkPlan = { - if (plan.supportsColumnar) { - // The tree feels kind of backwards - // This is the end of the columnar processing so go back to rows - if (conf.getConfString("spark.omni.sql.columnar.columnarToRow", "true").toBoolean) { - OmniColumnarToRowExec(insertRowToColumnar(plan)) - } else { - ColumnarToRowExec(insertRowToColumnar(plan)) - } - } else if (!plan.isInstanceOf[ColumnarToRowTransition]) { - plan.withNewChildren(plan.children.map(insertTransitions)) - } else { - plan - } - } - - def apply(plan: SparkPlan): SparkPlan = { - var preInsertPlan: SparkPlan = plan - columnarRules.foreach((r: ColumnarRule) => - preInsertPlan = r.preColumnarTransitions(preInsertPlan)) - var postInsertPlan = insertTransitions(preInsertPlan) - columnarRules.reverse.foreach((r: ColumnarRule) => - postInsertPlan = r.postColumnarTransitions(postInsertPlan)) - postInsertPlan - } -} - diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala index 96cb162a3..559224613 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/ColumnarShuffleExchangeExec.scala @@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ -import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleOrigin} +import org.apache.spark.sql.execution.exchange.{ENSURE_REQUIREMENTS, ShuffleExchangeExec, ShuffleExchangeLike, ShuffleOrigin} import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec.createShuffleWriteProcessor import org.apache.spark.sql.execution.metric._ import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleWriteMetricsReporter} @@ -56,7 +56,7 @@ class ColumnarShuffleExchangeExec( override val outputPartitioning: Partitioning, child: SparkPlan, shuffleOrigin: ShuffleOrigin = ENSURE_REQUIREMENTS) - extends ShuffleExchangeExec(outputPartitioning, child) { + extends ShuffleExchangeExec(outputPartitioning, child) with ShuffleExchangeLike{ private lazy val writeMetrics = SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext) @@ -94,6 +94,10 @@ class ColumnarShuffleExchangeExec( } } + override def numMappers: Int = columnarShuffleDependency.rdd.getNumPartitions + + override def numPartitions: Int = columnarShuffleDependency.partitioner.numPartitions + @transient lazy val columnarShuffleDependency: ShuffleDependency[Int, ColumnarBatch, ColumnarBatch] = { ColumnarShuffleExchangeExec.prepareShuffleDependency( diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala new file mode 100644 index 000000000..30f6146e5 --- /dev/null +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange.{ReusedExchangeExec, ShuffleExchangeLike} +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics} +import org.apache.spark.sql.vectorized.ColumnarBatch + +import scala.collection.mutable.ArrayBuffer + + +/** + * A wrapper of shuffle query stage, which follows the given partition arrangement. + * + * @param child It is usually `ShuffleQueryStageExec`, but can be the shuffle exchange + * node during canonicalization. + * @param partitionSpecs The partition specs that defines the arrangement. + */ +case class ColumnarCustomShuffleReaderExec( + child: SparkPlan, + partitionSpecs: Seq[ShufflePartitionSpec]) + extends UnaryExecNode { + // If this reader is to read shuffle files locally, then all partition specs should be + // `PartialMapperPartitionSpec`. + if (partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec])) { + assert(partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec])) + } + + override def supportsColumnar: Boolean = true + + override def output: Seq[Attribute] = child.output + override lazy val outputPartitioning: Partitioning = { + // If it is a local shuffle reader with one mapper per task, then the output partitioning is + // the same as the plan before shuffle. + // TODO this check is based on assumptions of callers' behavior but is sufficient for now. + if (partitionSpecs.nonEmpty && + partitionSpecs.forall(_.isInstanceOf[PartialMapperPartitionSpec]) && + partitionSpecs.map(_.asInstanceOf[PartialMapperPartitionSpec].mapIndex).toSet.size == + partitionSpecs.length) { + child match { + case ShuffleQueryStageExec(_, s: ShuffleExchangeLike) => + s.child.outputPartitioning + case ShuffleQueryStageExec(_, r @ ReusedExchangeExec(_, s: ShuffleExchangeLike)) => + s.child.outputPartitioning match { + case e: Expression => r.updateAttr(e).asInstanceOf[Partitioning] + case other => other + } + case _ => + throw new IllegalStateException("operating on canonicalization plan") + } + } else { + UnknownPartitioning(partitionSpecs.length) + } + } + + override def stringArgs: Iterator[Any] = { + val desc = if (isLocalReader) { + "local" + } else if (hasCoalescedPartition && hasSkewedPartition) { + "coalesced and skewed" + } else if (hasCoalescedPartition) { + "coalesced" + } else if (hasSkewedPartition) { + "skewed" + } else { + "" + } + Iterator(desc) + } + + def hasCoalescedPartition: Boolean = + partitionSpecs.exists(_.isInstanceOf[CoalescedPartitionSpec]) + + def hasSkewedPartition: Boolean = + partitionSpecs.exists(_.isInstanceOf[PartialReducerPartitionSpec]) + + def isLocalReader: Boolean = + partitionSpecs.exists(_.isInstanceOf[PartialMapperPartitionSpec]) + + private def shuffleStage = child match { + case stage: ShuffleQueryStageExec => Some(stage) + case _ => None + } + + @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { + if (partitionSpecs.nonEmpty && !isLocalReader && shuffleStage.get.mapStats.isDefined) { + val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId + Some(partitionSpecs.map { + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + startReducerIndex.until(endReducerIndex).map(bytesByPartitionId).sum + case p: PartialReducerPartitionSpec => p.dataSize + case p => throw new IllegalStateException("unexpected " + p) + }) + } else { + None + } + } + + private def sendDriverMetrics(): Unit = { + val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) + val driverAccumUpdates = ArrayBuffer.empty[(Long, Long)] + + val numPartitionsMetric = metrics("numPartitions") + numPartitionsMetric.set(partitionSpecs.length) + driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong) + + if (hasSkewedPartition) { + val skewedSpecs = partitionSpecs.collect { + case p: PartialReducerPartitionSpec => p + } + + val skewedPartitions = metrics("numSkewedPartitions") + val skewedSplits = metrics("numSkewedSplits") + + val numSkewedPartitions = skewedSpecs.map(_.reducerIndex).distinct.length + val numSplits = skewedSpecs.length + + skewedPartitions.set(numSkewedPartitions) + driverAccumUpdates += (skewedPartitions.id -> numSkewedPartitions) + + skewedSplits.set(numSplits) + driverAccumUpdates += (skewedSplits.id -> numSplits) + } + + partitionDataSizes.foreach { dataSizes => + val partitionDataSizeMetrics = metrics("partitionDataSize") + driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _) + // Set sum value to "partitionDataSize" metric. + partitionDataSizeMetrics.set(dataSizes.sum) + } + + SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates.toSeq) + } + + @transient override lazy val metrics: Map[String, SQLMetric] = { + if (shuffleStage.isDefined) { + Map("numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions")) ++ { + if (isLocalReader) { + // We split the mapper partition evenly when creating local shuffle reader, so no + // data size info is available. + Map.empty + } else { + Map("partitionDataSize" -> + SQLMetrics.createSizeMetric(sparkContext, "partition data size")) + } + } ++ { + if (hasSkewedPartition) { + Map("numSkewedPartitions" -> + SQLMetrics.createMetric(sparkContext, "number of skewed partitions"), + "numSkewedSplits" -> + SQLMetrics.createMetric(sparkContext, "number of skewed partition splits")) + } else { + Map.empty + } + } + } else { + // It's a canonicalized plan, no need to report metrics. + Map.empty + } + } + + private var cachedShuffleRDD: RDD[ColumnarBatch] = null + + private lazy val shuffleRDD: RDD[_] = { + sendDriverMetrics() + if (cachedShuffleRDD == null) { + cachedShuffleRDD = child match { + case stage: ShuffleQueryStageExec => + new ShuffledColumnarRDD( + stage.shuffle + .asInstanceOf[ColumnarShuffleExchangeExec] + .columnarShuffleDependency, + stage.shuffle.asInstanceOf[ColumnarShuffleExchangeExec].readMetrics, + partitionSpecs.toArray) + case _ => + throw new IllegalStateException("operating on canonicalized plan") + } + } + cachedShuffleRDD + } + + override protected def doExecute(): RDD[InternalRow] = { + shuffleRDD.asInstanceOf[RDD[InternalRow]] + } + + override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { + shuffleRDD.asInstanceOf[RDD[ColumnarBatch]] + } +} -- Gitee From 9f6dd2da70b05c143296fd2313eb1805b0ec7190 Mon Sep 17 00:00:00 2001 From: maxiaoqi2020 Date: Tue, 23 Aug 2022 19:17:36 +0800 Subject: [PATCH 2/2] modify according to the comments --- .../scala/com/huawei/boostkit/spark/ColumnarPlugin.scala | 6 +++++- .../adaptive/ColumnarCustomShuffleReaderExec.scala | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala index 7c2763638..cf06fedef 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/com/huawei/boostkit/spark/ColumnarPlugin.scala @@ -361,7 +361,11 @@ case class ColumnarPostOverrides() extends Rule[SparkPlan] { replaceWithColumnarPlan(child) case plan: ColumnarToRowExec => val child = replaceWithColumnarPlan(plan.child) - OmniColumnarToRowExec(child) + if (conf.getConfString("spark.omni.sql.columnar.columnarToRow", "true").toBoolean) { + OmniColumnarToRowExec(child) + } else { + ColumnarToRowExec(child) + } case r: SparkPlan if !r.isInstanceOf[QueryStageExec] && !r.supportsColumnar && r.children.exists(c => c.isInstanceOf[ColumnarToRowExec]) => diff --git a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala index 30f6146e5..f4f554879 100644 --- a/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala +++ b/omnioperator/omniop-spark-extension/java/src/main/scala/org/apache/spark/sql/execution/adaptive/ColumnarCustomShuffleReaderExec.scala @@ -200,7 +200,7 @@ case class ColumnarCustomShuffleReaderExec( } override protected def doExecute(): RDD[InternalRow] = { - shuffleRDD.asInstanceOf[RDD[InternalRow]] + throw new UnsupportedOperationException(s"This operator doesn't support doExecute().") } override protected def doExecuteColumnar(): RDD[ColumnarBatch] = { -- Gitee