From 3be3c5adea91873b4c9b0d5421ac5fc7c7927a5f Mon Sep 17 00:00:00 2001
From: zhanggougou <15651908511@163.com>
Date: Fri, 8 Aug 2025 10:03:51 +0800
Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96spark=E8=A1=80=E7=BC=98?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
fire-engines/fire-spark/pom.xml | 6 +
.../SparkSqlParser.scala | 132 +++++++++++++++++-
.../scala/com/zto/fire/spark/BaseSpark.scala | 4 +
.../spark/listener/FireSparkListener.scala | 9 +-
.../fire/spark/sql/SparkSqlParserBase.scala | 29 ++++
.../sync/SparkLineageAccumulatorManager.scala | 39 ++++++
6 files changed, 217 insertions(+), 2 deletions(-)
diff --git a/fire-engines/fire-spark/pom.xml b/fire-engines/fire-spark/pom.xml
index a4319225..a892a68f 100644
--- a/fire-engines/fire-spark/pom.xml
+++ b/fire-engines/fire-spark/pom.xml
@@ -69,6 +69,12 @@
${spark.version}
${maven.scope}
+
+ org.apache.spark
+ spark-hive_${scala.binary.version}
+ ${spark.version}
+ ${maven.scope}
+
org.apache.spark
spark-sql_${scala.binary.version}
diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala
index a4919d03..5573e9ff 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala
@@ -24,9 +24,12 @@ import com.zto.fire.common.enu.Operation
import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import com.zto.fire.spark.util.TiSparkUtils
import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.CreateTable
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable}
/**
* Spark SQL解析器,用于解析Spark SQL语句中的库、表、分区、操作类型等信息
@@ -149,6 +152,133 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 用于解析DDL语句中的库表、分区信息
+ *
+ * @return 返回sink目标表,用于维护表与表之间的关系
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ var sinkTable: Option[TableIdentifier] = None
+
+ sparkPlan.collect {
+ //Hive表扫描信息
+ case plan if plan.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" =>
+ val relationField = plan.getClass.getDeclaredField("relation")
+ relationField.setAccessible(true)
+ val relation = relationField.get(plan).asInstanceOf[HiveTableRelation]
+ val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier)
+ LineageManager.printLog(s"解析到select表名: $tableIdentifier")
+ this.addCatalog(tableIdentifier, Operation.SELECT)
+ sinkTable = Some(tableIdentifier)
+ //表写入信息
+ case plan: DataWritingCommandExec =>
+ plan.cmd match {
+ case CreateDataSourceTableAsSelectCommand(table, mode, query, outputColumnNames) =>
+ val tableIdentifier = this.toFireTableIdentifier(table.identifier)
+ this.addCatalog(tableIdentifier, Operation.CREATE_TABLE)
+ sinkTable = Some(tableIdentifier)
+ case CreateHiveTableAsSelectCommand(tableDesc, query, outputColumnNames, mode) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableDesc.identifier)
+ this.addCatalog(tableIdentifier, Operation.CREATE_TABLE)
+ sinkTable = Some(tableIdentifier)
+ case InsertIntoHadoopFsRelationCommand(outputPath, staticPartitions, ifPartitionNotExists, partitionColumns, bucketSpec, fileFormat, options, query, mode, catalogTable, fileIndex, outputColumnNames) =>
+ case InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists, outputColumnNames) => {
+ val tableIdentifier = this.toFireTableIdentifier(table.identifier)
+ this.addCatalog(tableIdentifier, Operation.INSERT_INTO)
+ sinkTable = Some(tableIdentifier)
+ }
+ case InsertIntoHiveDirCommand(isLocal, storage, query, overwrite, outputColumnNames) =>
+ }
+ //命令
+ case plan: ExecutedCommandExec => plan.cmd match {
+ case AddFileCommand(path) =>
+ case AddJarCommand(path) =>
+ case AlterDatabasePropertiesCommand(databaseName, props) =>
+ case AlterTableAddColumnsCommand(tableName, colsToAdd) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableChangeColumnCommand(tableName, columnName, newColumn) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableRecoverPartitionsCommand(tableName, cmd) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableRenameCommand(oldName, newName, isView) =>
+ val oldTableIdentifier = this.toFireTableIdentifier(oldName)
+ this.addCatalog(oldTableIdentifier, Operation.ALTER_TABLE)
+ val tableIdentifier = this.toFireTableIdentifier(newName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableRenamePartitionCommand(tableName, oldPartition, newPartition) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableSerDePropertiesCommand(tableName, serdeClassName, serdeProperties, partSpec) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableSetLocationCommand(tableName, partitionSpec, location) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableSetPropertiesCommand(tableName, properties, isView) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AlterViewAsCommand(tableName, originalText, query) =>
+ val tableIdentifier = this.toFireTableIdentifier(tableName)
+ this.addCatalog(tableIdentifier, Operation.ALTER_TABLE)
+ case AnalyzeColumnCommand(tableIdent, columnNames) =>
+ case AnalyzePartitionCommand(tableIdent, partitionSpec, noscan) =>
+ case AnalyzeTableCommand(tableIdent, noscan) =>
+ case CacheTableCommand(tableIdent, plan, isLazy) =>
+ case ClearCacheCommand() =>
+ case CreateDataSourceTableCommand(table, ignoreIfExists) =>
+ case CreateDatabaseCommand(databaseName, ifNotExists, path, comment, props) =>
+ case CreateFunctionCommand(databaseName, functionName, className, resources, isTemp, ignoreIfExists, replace) =>
+ case CreateTableCommand(table, ignoreIfExists) =>
+ case CreateTableLikeCommand(targetTable, sourceTable, location, ifNotExists) =>
+ case CreateTempViewUsing(tableIdent, userSpecifiedSchema, replace, global, provider, options) =>
+ case CreateViewCommand(name, userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace, viewType) =>
+ case DescribeColumnCommand(tableName, colNameParts, isExtended) =>
+ case DescribeDatabaseCommand(databaseName, extended) =>
+ case DescribeFunctionCommand(functionName, isExtended) =>
+ case DescribeTableCommand(tableName, partitionSpec, isExtended) =>
+ case DropDatabaseCommand(databaseName, ifExists, cascade) =>
+ case DropFunctionCommand(databaseName, functionName, ifExists, isTemp) =>
+ case DropTableCommand(tableName, ifExists, isView, purge) =>
+ case ExplainCommand(logicalPlan, extended, codegen, cost) =>
+ case InsertIntoDataSourceCommand(logicalRelation, query, overwrite) =>
+ case InsertIntoDataSourceDirCommand(storage, provider, query, overwrite) =>
+ case ListFilesCommand(files) =>
+ case ListJarsCommand(jars) =>
+ case LoadDataCommand(table, path, isLocal, isOverwrite, partition) =>
+ case RefreshResource(path) =>
+ case RefreshTable(tableIdent) =>
+ case ResetCommand =>
+ case SaveIntoDataSourceCommand(query, dataSource, options, mode) =>
+ case SetCommand(kv) =>
+ case SetDatabaseCommand(databaseName) =>
+ case ShowColumnsCommand(databaseName, tableName) =>
+ case ShowCreateTableCommand(tableName) =>
+ case ShowDatabasesCommand(databasePattern) =>
+ case ShowFunctionsCommand(db, pattern, showUserFunctions, showSystemFunctions) =>
+ case ShowPartitionsCommand(tableName, spec) =>
+ case ShowTablePropertiesCommand(tableName, propertyKey) =>
+ case ShowTablesCommand(databaseName, tableIdentifierPattern, isExtended, partitionSpec) =>
+ case StreamingExplainCommand(queryExecution, extended) =>
+ case TruncateTableCommand(tableName, partitionSpec) =>
+ case UncacheTableCommand(tableIdent, ifExists) =>
+ case _ =>
+ }
+ }
+ sinkTable
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala
index 7c332af7..7864e267 100644
--- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala
+++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala
@@ -87,8 +87,12 @@ trait BaseSpark extends SparkListener with BaseFire with Serializable {
if (noEmpty(this._spark, this.sc) && this.sc.isStarted) {
// 退出前触发一次血缘采集分析,避免spark core短时任务执行来不及采集血缘
if (FireFrameworkConf.accEnable && FireFrameworkConf.lineageEnable) {
+ //由于spark事件监听可能有延迟,固定睡眠一段时间
+ logInfo("<-- 开始spark context关闭 -->")
+ Thread.sleep(FireFrameworkConf.lineageShutdownSleep * 1000)
AccumulatorManager.collectDistributeLineage(false)
Thread.sleep(FireFrameworkConf.lineageShutdownSleep * 1000)
+ logInfo("<-- 完成spark context关闭 -->")
}
if (this.sc.isStarted) {
diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala
index 05011385..4e0e2942 100644
--- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala
+++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala
@@ -25,9 +25,10 @@ import com.zto.fire.core.util.ErrorToleranceAcc
import com.zto.fire.spark.BaseSpark
import com.zto.fire.spark.acc.AccumulatorManager
import com.zto.fire.spark.conf.FireSparkConf
-import com.zto.fire.spark.sync.SyncSparkEngine
+import com.zto.fire.spark.sync.{SparkLineageAccumulatorManager, SyncSparkEngine}
import com.zto.fire.spark.util.SparkSingletonFactory
import org.apache.spark.scheduler._
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
@@ -188,6 +189,12 @@ private[fire] class FireSparkListener(baseSpark: BaseSpark) extends SparkListene
*/
override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = this.baseSpark.onUnpersistRDD(unpersistRDD)
+ override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
+ case e: SparkListenerSQLExecutionEnd => SparkLineageAccumulatorManager.onExecutionEnd(e)
+ case _ => // Ignore
+ }
+
+
/**
* 用于注册内置累加器,每隔1分钟执行一次,延迟1分钟执行,默认执行10次
*/
diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala
index 0835c1cf..3c9b3e3b 100644
--- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala
+++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala
@@ -31,6 +31,7 @@ import com.zto.fire.spark.util.{SparkSingletonFactory, SparkUtils, TiSparkUtils}
import org.apache.spark.sql.catalyst.catalog.CatalogTable
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.{TableIdentifier => SparkTableIdentifier}
+import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
/**
@@ -124,6 +125,26 @@ private[fire] trait SparkSqlParserBase extends SqlParser {
}
}
+ /**
+ * 用于解析SparkSql中的库表信息(物理执行计划)
+ */
+ @Internal
+ def sqlParserWithExecution(queryExecution: QueryExecution): Unit = {
+ val sparkPlan = queryExecution.sparkPlan
+ var sinkTable: Option[TableIdentifier] = None
+ try {
+ sinkTable = this.ddlParserWithPlan(sparkPlan)
+ } catch {
+ case e: Throwable => {
+ LineageManager.printLog(s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}")
+ }
+ } finally {
+ tryWithLog {
+ this.queryParser(queryExecution.optimizedPlan, sinkTable)
+ }(this.logger, catchLog = s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false)
+ }
+ }
+
/**
* 用于判断给定的表是否为hive表
*
@@ -225,6 +246,14 @@ private[fire] trait SparkSqlParserBase extends SqlParser {
@Internal
protected def ddlParser(logicalPlan: LogicalPlan): Option[TableIdentifier]
+ /**
+ * 用于解析DDL语句中的库表、分区信息
+ *
+ * @return 返回sink目标表,用于维护表与表之间的关系
+ */
+ @Internal
+ protected def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier]
+
/**
* 将Fire的TableIdentifier转为Spark的TableIdentifier
*/
diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala
index 5b9408c4..bb43503d 100644
--- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala
+++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala
@@ -18,11 +18,17 @@
package com.zto.fire.spark.sync
import com.zto.fire.common.bean.lineage.Lineage
+import com.zto.fire.common.conf.FireFrameworkConf
import com.zto.fire.common.enu.Datasource
import com.zto.fire.common.lineage.{DatasourceDesc, SQLLineageManager}
+import com.zto.fire.common.util.ReflectionUtils
import com.zto.fire.core.sync.LineageAccumulatorManager
import com.zto.fire.predef._
import com.zto.fire.spark.acc.AccumulatorManager
+import com.zto.fire.spark.conf.FireSparkConf
+import com.zto.fire.spark.sql.SparkSqlParser
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd
/**
* 用于将各个executor端数据收集到driver端
@@ -32,6 +38,9 @@ import com.zto.fire.spark.acc.AccumulatorManager
*/
object SparkLineageAccumulatorManager extends LineageAccumulatorManager {
+ val SPARK_LISTENER_CLASS = "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd"
+
+
/**
* 将血缘信息放到累加器中
*/
@@ -50,4 +59,34 @@ object SparkLineageAccumulatorManager extends LineageAccumulatorManager {
override def getValue: Lineage = {
new Lineage(AccumulatorManager.getLineage, SQLLineageManager.getSQLLineage)
}
+
+ /**
+ * 开源版本暂时无法监听SparkListenerSQLExecutionEnd
+ *
+ * @param event
+ */
+ def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = {
+ if (!FireFrameworkConf.lineageEnable || !FireSparkConf.sparkLineageListenerEnable) {
+ logInfo("已手动关闭spark血缘采集,如需开启请检查配置")
+ }
+
+ tryWithLog {
+ val clazz = Class.forName(this.SPARK_LISTENER_CLASS)
+ if (ReflectionUtils.containsField(clazz, "qe")) {
+ val queryExecution = ReflectionUtils.getFieldValue(event, "qe").asInstanceOf[QueryExecution]
+
+ if (Option(queryExecution)
+ .flatMap(qe => Option(qe.sparkPlan))
+ .forall(_.toString == "LocalTableScan \n")) {
+ logInfo("LocalTableScan onExecutionEnd skipping")
+ return
+ }
+
+ SparkSqlParser.sqlParserWithExecution(queryExecution)
+ } else {
+ logWarning("当前spark版本不支持血缘采集")
+ }
+ }(logger, "", "spark监听血缘信息解析失败", isThrow = false, hook = false)
+
+ }
}
--
Gitee
From 41bbdf338151c1dd055dde9ee20382b3182f9b83 Mon Sep 17 00:00:00 2001
From: zhanggougou <15651908511@163.com>
Date: Fri, 8 Aug 2025 10:44:53 +0800
Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96spark=E8=A1=80=E7=BC=98?=
=?UTF-8?q?=E8=A7=A3=E6=9E=90?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../com.zto.fire.spark.sql/SparkSqlParser.scala | 11 +++++++++++
.../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++
.../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++
.../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++
.../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++
.../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++
.../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++
7 files changed, 77 insertions(+)
diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala
index 9b0c3f20..83f17a72 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala
@@ -25,6 +25,7 @@ import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import com.zto.fire.spark.util.TiSparkUtils
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -149,6 +150,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala
index 8fc202b0..beb45793 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala
@@ -26,6 +26,7 @@ import com.zto.fire.spark.util.TiSparkUtils
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.CreateViewCommand
import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
@@ -201,6 +202,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala
index c6642208..97ff2006 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala
@@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation
import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, CacheTableCommand, CreateViewCommand, RefreshTableCommand, UncacheTableCommand}
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -146,6 +147,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala
index 4c14fded..fe682209 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala
@@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation
import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand}
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -148,6 +149,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala
index c84eb0ef..49c50756 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala
@@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation
import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand}
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala
index c84eb0ef..49c50756 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala
@@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation
import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand}
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala
index c84eb0ef..49c50756 100644
--- a/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala
+++ b/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala
@@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation
import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager}
import org.apache.spark.sql.catalyst.analysis._
import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand}
import org.apache.spark.sql.execution.datasources.CreateTable
@@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase {
sinkTable
}
+ /**
+ * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要
+ *
+ * @param sparkPlan
+ * @return
+ */
+ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = {
+ None
+ }
+
/**
* 用于判断给定的表是否为临时表
*/
--
Gitee