diff --git a/fire-engines/fire-spark/pom.xml b/fire-engines/fire-spark/pom.xml index a4319225e4e0f22792df911e74df4b776b50370b..a892a68fdb1639f4c64bb1c9e4775b2ad6cdce42 100644 --- a/fire-engines/fire-spark/pom.xml +++ b/fire-engines/fire-spark/pom.xml @@ -69,6 +69,12 @@ ${spark.version} ${maven.scope} + + org.apache.spark + spark-hive_${scala.binary.version} + ${spark.version} + ${maven.scope} + org.apache.spark spark-sql_${scala.binary.version} diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala index a4919d03ede7a52182ab32610da31c7ebe83d396..5573e9ff3b5fc81c16810980597643269a8426b6 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala @@ -24,9 +24,12 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import com.zto.fire.spark.util.TiSparkUtils import org.apache.spark.sql.catalyst.analysis._ +import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ -import org.apache.spark.sql.execution.datasources.CreateTable +import org.apache.spark.sql.execution.datasources._ +import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} /** * Spark SQL解析器,用于解析Spark SQL语句中的库、表、分区、操作类型等信息 @@ -149,6 +152,133 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 用于解析DDL语句中的库表、分区信息 + * + * @return 返回sink目标表,用于维护表与表之间的关系 + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + var sinkTable: Option[TableIdentifier] = None + + sparkPlan.collect { + //Hive表扫描信息 + case plan if plan.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" => + val relationField = plan.getClass.getDeclaredField("relation") + relationField.setAccessible(true) + val relation = relationField.get(plan).asInstanceOf[HiveTableRelation] + val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier) + LineageManager.printLog(s"解析到select表名: $tableIdentifier") + this.addCatalog(tableIdentifier, Operation.SELECT) + sinkTable = Some(tableIdentifier) + //表写入信息 + case plan: DataWritingCommandExec => + plan.cmd match { + case CreateDataSourceTableAsSelectCommand(table, mode, query, outputColumnNames) => + val tableIdentifier = this.toFireTableIdentifier(table.identifier) + this.addCatalog(tableIdentifier, Operation.CREATE_TABLE) + sinkTable = Some(tableIdentifier) + case CreateHiveTableAsSelectCommand(tableDesc, query, outputColumnNames, mode) => + val tableIdentifier = this.toFireTableIdentifier(tableDesc.identifier) + this.addCatalog(tableIdentifier, Operation.CREATE_TABLE) + sinkTable = Some(tableIdentifier) + case InsertIntoHadoopFsRelationCommand(outputPath, staticPartitions, ifPartitionNotExists, partitionColumns, bucketSpec, fileFormat, options, query, mode, catalogTable, fileIndex, outputColumnNames) => + case InsertIntoHiveTable(table, partition, query, overwrite, ifPartitionNotExists, outputColumnNames) => { + val tableIdentifier = this.toFireTableIdentifier(table.identifier) + this.addCatalog(tableIdentifier, Operation.INSERT_INTO) + sinkTable = Some(tableIdentifier) + } + case InsertIntoHiveDirCommand(isLocal, storage, query, overwrite, outputColumnNames) => + } + //命令 + case plan: ExecutedCommandExec => plan.cmd match { + case AddFileCommand(path) => + case AddJarCommand(path) => + case AlterDatabasePropertiesCommand(databaseName, props) => + case AlterTableAddColumnsCommand(tableName, colsToAdd) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableChangeColumnCommand(tableName, columnName, newColumn) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableDropPartitionCommand(tableName, specs, ifExists, purge, retainData) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableRecoverPartitionsCommand(tableName, cmd) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableRenameCommand(oldName, newName, isView) => + val oldTableIdentifier = this.toFireTableIdentifier(oldName) + this.addCatalog(oldTableIdentifier, Operation.ALTER_TABLE) + val tableIdentifier = this.toFireTableIdentifier(newName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableRenamePartitionCommand(tableName, oldPartition, newPartition) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableSerDePropertiesCommand(tableName, serdeClassName, serdeProperties, partSpec) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableSetLocationCommand(tableName, partitionSpec, location) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableSetPropertiesCommand(tableName, properties, isView) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterTableUnsetPropertiesCommand(tableName, propKeys, ifExists, isView) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AlterViewAsCommand(tableName, originalText, query) => + val tableIdentifier = this.toFireTableIdentifier(tableName) + this.addCatalog(tableIdentifier, Operation.ALTER_TABLE) + case AnalyzeColumnCommand(tableIdent, columnNames) => + case AnalyzePartitionCommand(tableIdent, partitionSpec, noscan) => + case AnalyzeTableCommand(tableIdent, noscan) => + case CacheTableCommand(tableIdent, plan, isLazy) => + case ClearCacheCommand() => + case CreateDataSourceTableCommand(table, ignoreIfExists) => + case CreateDatabaseCommand(databaseName, ifNotExists, path, comment, props) => + case CreateFunctionCommand(databaseName, functionName, className, resources, isTemp, ignoreIfExists, replace) => + case CreateTableCommand(table, ignoreIfExists) => + case CreateTableLikeCommand(targetTable, sourceTable, location, ifNotExists) => + case CreateTempViewUsing(tableIdent, userSpecifiedSchema, replace, global, provider, options) => + case CreateViewCommand(name, userSpecifiedColumns, comment, properties, originalText, child, allowExisting, replace, viewType) => + case DescribeColumnCommand(tableName, colNameParts, isExtended) => + case DescribeDatabaseCommand(databaseName, extended) => + case DescribeFunctionCommand(functionName, isExtended) => + case DescribeTableCommand(tableName, partitionSpec, isExtended) => + case DropDatabaseCommand(databaseName, ifExists, cascade) => + case DropFunctionCommand(databaseName, functionName, ifExists, isTemp) => + case DropTableCommand(tableName, ifExists, isView, purge) => + case ExplainCommand(logicalPlan, extended, codegen, cost) => + case InsertIntoDataSourceCommand(logicalRelation, query, overwrite) => + case InsertIntoDataSourceDirCommand(storage, provider, query, overwrite) => + case ListFilesCommand(files) => + case ListJarsCommand(jars) => + case LoadDataCommand(table, path, isLocal, isOverwrite, partition) => + case RefreshResource(path) => + case RefreshTable(tableIdent) => + case ResetCommand => + case SaveIntoDataSourceCommand(query, dataSource, options, mode) => + case SetCommand(kv) => + case SetDatabaseCommand(databaseName) => + case ShowColumnsCommand(databaseName, tableName) => + case ShowCreateTableCommand(tableName) => + case ShowDatabasesCommand(databasePattern) => + case ShowFunctionsCommand(db, pattern, showUserFunctions, showSystemFunctions) => + case ShowPartitionsCommand(tableName, spec) => + case ShowTablePropertiesCommand(tableName, propertyKey) => + case ShowTablesCommand(databaseName, tableIdentifierPattern, isExtended, partitionSpec) => + case StreamingExplainCommand(queryExecution, extended) => + case TruncateTableCommand(tableName, partitionSpec) => + case UncacheTableCommand(tableIdent, ifExists) => + case _ => + } + } + sinkTable + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala index 9b0c3f2033d9827a692f8110acadfd50ad465543..83f17a72306d5022579f72c916337b6d9bd8abd5 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-2.4/com.zto.fire.spark.sql/SparkSqlParser.scala @@ -25,6 +25,7 @@ import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import com.zto.fire.spark.util.TiSparkUtils import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.CreateTable @@ -149,6 +150,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala index 8fc202b08d5670cea5b3bf230b2db24e9a086d11..beb45793061ef06529ad521c861ba7eee89dc880 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.0/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -26,6 +26,7 @@ import com.zto.fire.spark.util.TiSparkUtils import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.catalog.HiveTableRelation import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.CreateViewCommand import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation} @@ -201,6 +202,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala index c66422087801dce0886c44a226af1d21b932d810..97ff2006092859b12277fc27a3228a72795c1695 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.1/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableDropPartitionCommand, CacheTableCommand, CreateViewCommand, RefreshTableCommand, UncacheTableCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -146,6 +147,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala index 4c14fded53ceebb2829257dd423de70455f7f661..fe682209f17c7773138563cfe06bce199b902e65 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.2/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -148,6 +149,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala index c84eb0ef8034bb9ba4de185b7e608784c5bbc3eb..49c50756245082f418c1f3a73cad0cfd849bca5c 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.3/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala index c84eb0ef8034bb9ba4de185b7e608784c5bbc3eb..49c50756245082f418c1f3a73cad0cfd849bca5c 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.4/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala index c84eb0ef8034bb9ba4de185b7e608784c5bbc3eb..49c50756245082f418c1f3a73cad0cfd849bca5c 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-3.5/com/zto/fire/spark/sql/SparkSqlParser.scala @@ -24,6 +24,7 @@ import com.zto.fire.common.enu.Operation import com.zto.fire.common.lineage.{LineageManager, SQLLineageManager} import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command.{AlterTableRenamePartitionCommand, CreateViewCommand} import org.apache.spark.sql.execution.datasources.CreateTable @@ -130,6 +131,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { sinkTable } + /** + * 暂时只有spark2.3需解析物理执行计划,其他版本暂不需要 + * + * @param sparkPlan + * @return + */ + override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { + None + } + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala index 7c332af7ac8fd195691225c8e873bfbdc389e2bd..7864e267747ca53042d7dbe3571907096a50f827 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/BaseSpark.scala @@ -87,8 +87,12 @@ trait BaseSpark extends SparkListener with BaseFire with Serializable { if (noEmpty(this._spark, this.sc) && this.sc.isStarted) { // 退出前触发一次血缘采集分析,避免spark core短时任务执行来不及采集血缘 if (FireFrameworkConf.accEnable && FireFrameworkConf.lineageEnable) { + //由于spark事件监听可能有延迟,固定睡眠一段时间 + logInfo("<-- 开始spark context关闭 -->") + Thread.sleep(FireFrameworkConf.lineageShutdownSleep * 1000) AccumulatorManager.collectDistributeLineage(false) Thread.sleep(FireFrameworkConf.lineageShutdownSleep * 1000) + logInfo("<-- 完成spark context关闭 -->") } if (this.sc.isStarted) { diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala index 05011385abf66b1d5e234e407cc652f979442aaa..4e0e29421c59eed3ceb6557645eb4e5ca3c6a6df 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/listener/FireSparkListener.scala @@ -25,9 +25,10 @@ import com.zto.fire.core.util.ErrorToleranceAcc import com.zto.fire.spark.BaseSpark import com.zto.fire.spark.acc.AccumulatorManager import com.zto.fire.spark.conf.FireSparkConf -import com.zto.fire.spark.sync.SyncSparkEngine +import com.zto.fire.spark.sync.{SparkLineageAccumulatorManager, SyncSparkEngine} import com.zto.fire.spark.util.SparkSingletonFactory import org.apache.spark.scheduler._ +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} @@ -188,6 +189,12 @@ private[fire] class FireSparkListener(baseSpark: BaseSpark) extends SparkListene */ override def onUnpersistRDD(unpersistRDD: SparkListenerUnpersistRDD): Unit = this.baseSpark.onUnpersistRDD(unpersistRDD) + override def onOtherEvent(event: SparkListenerEvent): Unit = event match { + case e: SparkListenerSQLExecutionEnd => SparkLineageAccumulatorManager.onExecutionEnd(e) + case _ => // Ignore + } + + /** * 用于注册内置累加器,每隔1分钟执行一次,延迟1分钟执行,默认执行10次 */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala index 0835c1cfe82ce0bdf3f58651062cce63802e6f7e..3c9b3e3b85672aefecef4d8240f0aea51663abb9 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala @@ -31,6 +31,7 @@ import com.zto.fire.spark.util.{SparkSingletonFactory, SparkUtils, TiSparkUtils} import org.apache.spark.sql.catalyst.catalog.CatalogTable import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.{TableIdentifier => SparkTableIdentifier} +import org.apache.spark.sql.execution.{QueryExecution, SparkPlan} /** @@ -124,6 +125,26 @@ private[fire] trait SparkSqlParserBase extends SqlParser { } } + /** + * 用于解析SparkSql中的库表信息(物理执行计划) + */ + @Internal + def sqlParserWithExecution(queryExecution: QueryExecution): Unit = { + val sparkPlan = queryExecution.sparkPlan + var sinkTable: Option[TableIdentifier] = None + try { + sinkTable = this.ddlParserWithPlan(sparkPlan) + } catch { + case e: Throwable => { + LineageManager.printLog(s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}") + } + } finally { + tryWithLog { + this.queryParser(queryExecution.optimizedPlan, sinkTable) + }(this.logger, catchLog = s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false) + } + } + /** * 用于判断给定的表是否为hive表 * @@ -225,6 +246,14 @@ private[fire] trait SparkSqlParserBase extends SqlParser { @Internal protected def ddlParser(logicalPlan: LogicalPlan): Option[TableIdentifier] + /** + * 用于解析DDL语句中的库表、分区信息 + * + * @return 返回sink目标表,用于维护表与表之间的关系 + */ + @Internal + protected def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] + /** * 将Fire的TableIdentifier转为Spark的TableIdentifier */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala index 5b9408c4f57fccdd447ead0a0c354b6c6667d3f7..bb43503df130d8ce663bdbeecf0b9a0444f5f3aa 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sync/SparkLineageAccumulatorManager.scala @@ -18,11 +18,17 @@ package com.zto.fire.spark.sync import com.zto.fire.common.bean.lineage.Lineage +import com.zto.fire.common.conf.FireFrameworkConf import com.zto.fire.common.enu.Datasource import com.zto.fire.common.lineage.{DatasourceDesc, SQLLineageManager} +import com.zto.fire.common.util.ReflectionUtils import com.zto.fire.core.sync.LineageAccumulatorManager import com.zto.fire.predef._ import com.zto.fire.spark.acc.AccumulatorManager +import com.zto.fire.spark.conf.FireSparkConf +import com.zto.fire.spark.sql.SparkSqlParser +import org.apache.spark.sql.execution.QueryExecution +import org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd /** * 用于将各个executor端数据收集到driver端 @@ -32,6 +38,9 @@ import com.zto.fire.spark.acc.AccumulatorManager */ object SparkLineageAccumulatorManager extends LineageAccumulatorManager { + val SPARK_LISTENER_CLASS = "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd" + + /** * 将血缘信息放到累加器中 */ @@ -50,4 +59,34 @@ object SparkLineageAccumulatorManager extends LineageAccumulatorManager { override def getValue: Lineage = { new Lineage(AccumulatorManager.getLineage, SQLLineageManager.getSQLLineage) } + + /** + * 开源版本暂时无法监听SparkListenerSQLExecutionEnd + * + * @param event + */ + def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit = { + if (!FireFrameworkConf.lineageEnable || !FireSparkConf.sparkLineageListenerEnable) { + logInfo("已手动关闭spark血缘采集,如需开启请检查配置") + } + + tryWithLog { + val clazz = Class.forName(this.SPARK_LISTENER_CLASS) + if (ReflectionUtils.containsField(clazz, "qe")) { + val queryExecution = ReflectionUtils.getFieldValue(event, "qe").asInstanceOf[QueryExecution] + + if (Option(queryExecution) + .flatMap(qe => Option(qe.sparkPlan)) + .forall(_.toString == "LocalTableScan \n")) { + logInfo("LocalTableScan onExecutionEnd skipping") + return + } + + SparkSqlParser.sqlParserWithExecution(queryExecution) + } else { + logWarning("当前spark版本不支持血缘采集") + } + }(logger, "", "spark监听血缘信息解析失败", isThrow = false, hook = false) + + } }