From 3be3c5adea91873b4c9b0d5421ac5fc7c7927a5f Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Fri, 8 Aug 2025 10:03:51 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E4=BC=98=E5=8C=96spark=E8=A1=80=E7=BC=98?= =?UTF-8?q?=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- fire-engines/fire-spark/pom.xml | 6 + .../SparkSqlParser.scala | 132 +++++++++++++++++- .../scala/com/zto/fire/spark/BaseSpark.scala | 4 + .../spark/listener/FireSparkListener.scala | 9 +- .../fire/spark/sql/SparkSqlParserBase.scala | 29 ++++ .../sync/SparkLineageAccumulatorManager.scala | 39 ++++++ 6 files changed, 217 insertions(+), 2 deletions(-) diff --git a/fire-engines/fire-spark/pom.xml b/fire-engines/fire-spark/pom.xml index a4319225..a892a68f 100644 --- a/fire-engines/fire-spark/pom.xml +++ b/fire-engines/fire-spark/pom.xml @@ -69,6 +69,12 @@ ${spark.version} ${maven.scope} + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + ${maven.scope} + org.apache.spark spark-sql_${scala.binary.version} diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala index a4919d03..5573e9ff 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala @@ -24,9 +24,12 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import com.zto.fire.spark.util.TiSparkUtils import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} /** * Spark SQL解析器,用于解析Spark SQL语句中的库、表、分区、操作类型等信息 @@ -149,6 +152,133 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 用于解析DDL语句中的库表、分区信息 + * + * @return 返回sink目标表,用于维护表与表之间的关系 + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + var sinkTable: Option[TableIdentifier] = None + + sparkPlan.collect { + //Hive表扫描信息 + case plan if plan.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" => + val relationField = plan.getClass.getDeclaredField("relation") + relationField.setAccessible(true) + val relation = relationField.get(plan).asInstanceOf[HiveTableRelation] + val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier) + LineageManager.printLog(s"解析到select表名: $tableIdentifier") + this.addCatalog(tableIdentifier, Operation.SELECT) + sinkTable = Some(tableIdentifier) + //表写入信息 + case plan: DataWritingCommandExec => + plan.cmd match { + case CreateDataSourceTableAsSelectCommand(table, mode, query, outputColumnNames) => + val tableIdentifier = this.toFireTableIdentifier(table.identifier) + this.addCatalog(tableIdentifier, Operation.CREATE_TABLE) + sinkTable = Some(tableIdentifier) + case CreateHiveTableAsSelectCommand(tableDesc, query, outputColumnNames, mode) => + val tableIdentifier = this.toFireTableIdentifier(tableDesc.identifier) + this.addCatalog(tableIdentifier, Operation.CREATE_TABLE) + sinkTable = Some(tableIdentifier) + case InsertIntoHadoopFsRelationCommand(outputPath, staticPartitions, ifPartitionNotExists, partitionColumns, bucketSpec, fileFormat, options, query, mode, catalogTable, fileIndex, outputColumnNames) => + case InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists, outputColumnNames) => { + val tableIdentifier = this.toFireTableIdentifier(table.identifier) + this.addCatalog(tableIdentifier, Operation.INSERT_INTO) + sinkTable = Some(tableIdentifier) + } + case InsertIntoHiveDirCommand(isLocal, storage, query, overwrite, outputColumnNames) => + } + //命令 + case plan: ExecutedCommandExec => plan.cmd match { + case AddFileCommand(path) => + case AddJarCommand(path) => + case AlterDatabasePropertiesCommand(databaseName, props) => + case AlterTableAddColumnsCommand(tableName, colsToAdd) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableChangeColumnCommand(tableName, columnName, newColumn) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableRecoverPartitionsCommand(tableName, cmd) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableRenameCommand(oldName, newName, isView) => + val oldTableIdentifier = this.toFireTableIdentifier(oldName) + this.addCatalog(oldTableIdentifier, Operation.ALTER_TABLE) + val tableIdentifier = this.toFireTableIdentifier(newName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableRenamePartitionCommand(tableName, oldPartition, newPartition) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableSerDePropertiesCommand(tableName, serdeClassName, serdeProperties, partSpec) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableSetLocationCommand(tableName, partitionSpec, location) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableSetPropertiesCommand(tableName, properties, isView) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterViewAsCommand(tableName, originalText, query) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AnalyzeColumnCommand(tableIdent, columnNames) => + case AnalyzePartitionCommand(tableIdent, partitionSpec, noscan) => + case AnalyzeTableCommand(tableIdent, noscan) => + case CacheTableCommand(tableIdent, plan, isLazy) => + case ClearCacheCommand() => + case CreateDataSourceTableCommand(table, ignoreIfExists) => + case CreateDatabaseCommand(databaseName, ifNotExists, path, comment, props) => + case CreateFunctionCommand(databaseName, functionName, className, resources, isTemp, ignoreIfExists, replace) => + case CreateTableCommand(table, ignoreIfExists) => + case CreateTableLikeCommand(targetTable, sourceTable, location, ifNotExists) => + case CreateTempViewUsing(tableIdent, userSpecifiedSchema, replace, global, provider, options) => + case CreateViewCommand(name, userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace, viewType) => + case DescribeColumnCommand(tableName, colNameParts, isExtended) => + case DescribeDatabaseCommand(databaseName, extended) => + case DescribeFunctionCommand(functionName, isExtended) => + case DescribeTableCommand(tableName, partitionSpec, isExtended) => + case DropDatabaseCommand(databaseName, ifExists, cascade) => + case DropFunctionCommand(databaseName, functionName, ifExists, isTemp) => + case DropTableCommand(tableName, ifExists, isView, purge) => + case ExplainCommand(logicalPlan, extended, codegen, cost) => + case InsertIntoDataSourceCommand(logicalRelation, query, overwrite) => + case InsertIntoDataSourceDirCommand(storage, provider, query, overwrite) => + case ListFilesCommand(files) => + case ListJarsCommand(jars) => + case LoadDataCommand(table, path, isLocal, isOverwrite, partition) => + case RefreshResource(path) => + case RefreshTable(tableIdent) => + case ResetCommand => + case SaveIntoDataSourceCommand(query, dataSource, options, mode) => + case SetCommand(kv) => + case SetDatabaseCommand(databaseName) => + case ShowColumnsCommand(databaseName, tableName) => + case ShowCreateTableCommand(tableName) => + case ShowDatabasesCommand(databasePattern) => + case ShowFunctionsCommand(db, pattern, showUserFunctions, showSystemFunctions) => + case ShowPartitionsCommand(tableName, spec) => + case ShowTablePropertiesCommand(tableName, propertyKey) => + case ShowTablesCommand(databaseName, tableIdentifierPattern, isExtended, partitionSpec) => + case StreamingExplainCommand(queryExecution, extended) => + case TruncateTableCommand(tableName, partitionSpec) => + case UncacheTableCommand(tableIdent, ifExists) => + case _ => + } + } + sinkTable + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala index 7c332af7..7864e267 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala @@ -87,8 +87,12 @@ trait BaseSpark extends SparkListener with BaseFire with Serializable { if (noEmpty(this._spark, this.sc) && this.sc.isStarted) { // 退出前触发一次血缘采集分析,避免spark core短时任务执行来不及采集血缘 if (FireFrameworkConf.accEnable && FireFrameworkConf.lineageEnable) { + //由于spark事件监听可能有延迟,固定睡眠一段时间 + logInfo("<-- 开始spark context关闭 -->") + Thread.sleep(FireFrameworkConf.lineageShutdownSleep * 1000) AccumulatorManager.collectDistributeLineage(false) Thread.sleep(FireFrameworkConf.lineageShutdownSleep * 1000) + logInfo("<-- 完成spark context关闭 -->") } if (this.sc.isStarted) { diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala index 05011385..4e0e2942 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala @@ -25,9 +25,10 @@ import com.zto.fire.core.util.ErrorToleranceAcc import com.zto.fire.spark.BaseSpark import com.zto.fire.spark.acc.AccumulatorManager import com.zto.fire.spark.conf.FireSparkConf -import com.zto.fire.spark.sync.SyncSparkEngine +import com.zto.fire.spark.sync.{SparkLineageAccumulatorManager, SyncSparkEngine} import com.zto.fire.spark.util.SparkSingletonFactory import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} @@ -188,6 +189,12 @@ private[fire] class FireSparkListener(baseSpark: BaseSpark) extends SparkListene */ override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = this.baseSpark.onUnpersistRDD(unpersistRDD) + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionEnd => SparkLineageAccumulatorManager.onExecutionEnd(e) + case _ => // Ignore + } + + /** * 用于注册内置累加器,每隔1分钟执行一次,延迟1分钟执行,默认执行10次 */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala index 0835c1cf..3c9b3e3b 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala @@ -31,6 +31,7 @@ import com.zto.fire.spark.util.{SparkSingletonFactory, SparkUtils, TiSparkUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{TableIdentifier => SparkTableIdentifier} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} /** @@ -124,6 +125,26 @@ private[fire] trait SparkSqlParserBase extends SqlParser { } } + /** + * 用于解析SparkSql中的库表信息(物理执行计划) + */ + @Internal + def sqlParserWithExecution(queryExecution: QueryExecution): Unit = { + val sparkPlan = queryExecution.sparkPlan + var sinkTable: Option[TableIdentifier] = None + try { + sinkTable = this.ddlParserWithPlan(sparkPlan) + } catch { + case e: Throwable => { + LineageManager.printLog(s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}") + } + } finally { + tryWithLog { + this.queryParser(queryExecution.optimizedPlan, sinkTable) + }(this.logger, catchLog = s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false) + } + } + /** * 用于判断给定的表是否为hive表 * @@ -225,6 +246,14 @@ private[fire] trait SparkSqlParserBase extends SqlParser { @Internal protected def ddlParser(logicalPlan: LogicalPlan): Option[TableIdentifier] + /** + * 用于解析DDL语句中的库表、分区信息 + * + * @return 返回sink目标表,用于维护表与表之间的关系 + */ + @Internal + protected def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] + /** * 将Fire的TableIdentifier转为Spark的TableIdentifier */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala index 5b9408c4..bb43503d 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala @@ -18,11 +18,17 @@ package com.zto.fire.spark.sync import com.zto.fire.common.bean.lineage.Lineage +import com.zto.fire.common.conf.FireFrameworkConf import com.zto.fire.common.enu.Datasource import com.zto.fire.common.lineage.{DatasourceDesc, SQLLineageManager} +import com.zto.fire.common.util.ReflectionUtils import com.zto.fire.core.sync.LineageAccumulatorManager import com.zto.fire.predef._ import com.zto.fire.spark.acc.AccumulatorManager +import com.zto.fire.spark.conf.FireSparkConf +import com.zto.fire.spark.sql.SparkSqlParser +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd /** * 用于将各个executor端数据收集到driver端 @@ -32,6 +38,9 @@ import com.zto.fire.spark.acc.AccumulatorManager */ object SparkLineageAccumulatorManager extends LineageAccumulatorManager { + val SPARK_LISTENER_CLASS = "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd" + + /** * 将血缘信息放到累加器中 */ @@ -50,4 +59,34 @@ object SparkLineageAccumulatorManager extends LineageAccumulatorManager { override def getValue: Lineage = { new Lineage(AccumulatorManager.getLineage, SQLLineageManager.getSQLLineage) } + + /** + * 开源版本暂时无法监听SparkListenerSQLExecutionEnd + * + * @param event + */ + def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + if (!FireFrameworkConf.lineageEnable || !FireSparkConf.sparkLineageListenerEnable) { + logInfo("已手动关闭spark血缘采集,如需开启请检查配置") + } + + tryWithLog { + val clazz = Class.forName(this.SPARK_LISTENER_CLASS) + if (ReflectionUtils.containsField(clazz, "qe")) { + val queryExecution = ReflectionUtils.getFieldValue(event, "qe").asInstanceOf[QueryExecution] + + if (Option(queryExecution) + .flatMap(qe => Option(qe.sparkPlan)) + .forall(_.toString == "LocalTableScan \n")) { + logInfo("LocalTableScan onExecutionEnd skipping") + return + } + + SparkSqlParser.sqlParserWithExecution(queryExecution) + } else { + logWarning("当前spark版本不支持血缘采集") + } + }(logger, "", "spark监听血缘信息解析失败", isThrow = false, hook = false) + + } } -- Gitee From 41bbdf338151c1dd055dde9ee20382b3182f9b83 Mon Sep 17 00:00:00 2001 From: zhanggougou <15651908511@163.com> Date: Fri, 8 Aug 2025 10:44:53 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BC=98=E5=8C=96spark=E8=A1=80=E7=BC=98?= =?UTF-8?q?=E8=A7=A3=E6=9E=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com.zto.fire.spark.sql/SparkSqlParser.scala | 11 +++++++++++ .../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++ .../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++ .../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++ .../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++ .../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++ .../com/zto/fire/spark/sql/SparkSqlParser.scala | 11 +++++++++++ 7 files changed, 77 insertions(+) diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala index 9b0c3f20..83f17a72 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala @@ -25,6 +25,7 @@ import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import com.zto.fire.spark.util.TiSparkUtils import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.CreateTable @@ -149,6 +150,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala index 8fc202b0..beb45793 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -26,6 +26,7 @@ import com.zto.fire.spark.util.TiSparkUtils import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CreateViewCommand import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} @@ -201,6 +202,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala index c6642208..97ff2006 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, CacheTableCommand, CreateViewCommand, RefreshTableCommand, UncacheTableCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -146,6 +147,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala index 4c14fded..fe682209 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -148,6 +149,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala index c84eb0ef..49c50756 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala index c84eb0ef..49c50756 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala index c84eb0ef..49c50756 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ -- Gitee