diff --git a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala index b08acc1e6473d83ee3b17ed8fec7c1bcfa4e3ce4..e738c6dfdeebbabb2f1bbf2f2881317a455e22c9 100644 --- a/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala +++ b/fire-engines/fire-flink/src/main/scala/com/zto/fire/flink/sql/FlinkSqlParserBase.scala @@ -288,7 +288,9 @@ private[fire] trait FlinkSqlParserBase extends SqlParser { } val paimonCatalog = this.tableEnv.paimonCatalog.get() - paimonCatalog.listTables(tableIdentifier.database).contains(tableIdentifier.table) + val checkTable = paimonCatalog.getTable(new ObjectPath(tableIdentifier.database, tableIdentifier.table)); + //目前只能通过该方法判断,hive表为org.apache.paimon.flink.FormatCatalogTable + checkTable != null && checkTable.getClass.getName.equals("org.apache.paimon.flink.DataCatalogTable") }(this.logger, catchLog = s"判断${tableIdentifier}是否为paimon表失败", hook = false) } } diff --git a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala index 5573e9ff3b5fc81c16810980597643269a8426b6..b16c11ed91cbcf24e0f4f95ce76be184d05caaba 100644 --- a/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala +++ b/fire-engines/fire-spark/src/main/scala-spark-2.3/com.zto.fire.spark.sql/SparkSqlParser.scala @@ -30,6 +30,8 @@ import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources._ import org.apache.spark.sql.hive.execution.{CreateHiveTableAsSelectCommand, InsertIntoHiveDirCommand, InsertIntoHiveTable} +import org.apache.spark.sql.execution.adaptive.QueryStageInput +import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec /** * Spark SQL解析器,用于解析Spark SQL语句中的库、表、分区、操作类型等信息 @@ -159,7 +161,7 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { */ override def ddlParserWithPlan(sparkPlan: SparkPlan): Option[TableIdentifier] = { var sinkTable: Option[TableIdentifier] = None - + LineageManager.printLog(s"开始解析物理执行计划, $sparkPlan") sparkPlan.collect { //Hive表扫描信息 case plan if plan.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" => @@ -167,9 +169,16 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { relationField.setAccessible(true) val relation = relationField.get(plan).asInstanceOf[HiveTableRelation] val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier) - LineageManager.printLog(s"解析到select表名: $tableIdentifier") + LineageManager.printLog(s"hive scan解析到select表名: $tableIdentifier") this.addCatalog(tableIdentifier, Operation.SELECT) sinkTable = Some(tableIdentifier) + //cache scan + case p: InMemoryTableScanExec => + handleInMemoryTableScan(p).foreach(x => { + LineageManager.printLog(s"cache scan中解析到select表名: $x") + this.addCatalog(x, Operation.SELECT) + sinkTable = Some(x) + }) //表写入信息 case plan: DataWritingCommandExec => plan.cmd match { @@ -273,12 +282,33 @@ private[fire] object SparkSqlParser extends SparkSqlParserBase { case StreamingExplainCommand(queryExecution, extended) => case TruncateTableCommand(tableName, partitionSpec) => case UncacheTableCommand(tableIdent, ifExists) => - case _ => + case _ => LineageManager.printLog(s"解析物理执行计划异常,无法匹配该Statement") } } sinkTable } + /** + * 处理执行计划中InMemoryTableScanExec + * + * @param plan 物理执行计划 + * @return 表操作信息 + */ + def handleInMemoryTableScan(plan: SparkPlan): Seq[TableIdentifier] = { + plan match { + case p if p.getClass.getName == "org.apache.spark.sql.hive.execution.HiveTableScanExec" => + val relationField = p.getClass.getDeclaredField("relation") + relationField.setAccessible(true) + val relation = relationField.get(plan).asInstanceOf[HiveTableRelation] + val tableIdentifier = this.toFireTableIdentifier(relation.tableMeta.identifier) + Seq(tableIdentifier) + case p: QueryStageInput => handleInMemoryTableScan(p.childStage) + case p: InMemoryTableScanExec => handleInMemoryTableScan(p.relation.child) + case p: SparkPlan => p.children.flatMap(handleInMemoryTableScan) + } + } + + /** * 用于判断给定的表是否为临时表 */ diff --git a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala index 3c9b3e3b85672aefecef4d8240f0aea51663abb9..8177b57090cdacfcb7d6bebbfd9f20142b1f7aca 100644 --- a/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala +++ b/fire-engines/fire-spark/src/main/scala/com/zto/fire/spark/sql/SparkSqlParserBase.scala @@ -136,12 +136,12 @@ private[fire] trait SparkSqlParserBase extends SqlParser { sinkTable = this.ddlParserWithPlan(sparkPlan) } catch { case e: Throwable => { - LineageManager.printLog(s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}") + LineageManager.printLog(s"可忽略异常:实时血缘解析物理执行计划SQL报错,sparkPlan: ${sparkPlan}") } } finally { tryWithLog { this.queryParser(queryExecution.optimizedPlan, sinkTable) - }(this.logger, catchLog = s"可忽略异常:实时血缘解析SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false) + }(this.logger, catchLog = s"可忽略异常:实时血缘解析物理执行计划SQL报错,sparkPlan: ${sparkPlan}", isThrow = FireFrameworkConf.lineageDebugEnable, hook = false) } }